Module geode.db

Expand source code
# ------------------------------------------------------------------------------
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#  MA 02110-1301, USA.
# ------------------------------------------------------------------------------

# ------------------------------------------------------------------------------
#   Modules
# ------------------------------------------------------------------------------

# Database
import sqlite3

# Geode
from geode.conf import Configuration

# Structure
from enum import Enum

# System
from os import W_OK
from os import access

# ------------------------------------------------------------------------------
#   Class
# ------------------------------------------------------------------------------

class Database(object):

    class Metadata(Enum):
        """ Represent metadata
        """

        MASTER = "sqlite_master"


    class Condition(Enum):
        """ Represent SQL conditions
        """

        IN = "IN"
        LIKE = "LIKE"
        BETWEEN = "BETWEEN"

        LESS = "<"
        LESSEQUAL = "<="
        GREATER = ">"
        GREATEREQUAL = ">="

        EQUAL = "="
        NOTEQUAL = "<>"


    class Type(Enum):
        """ Represent SQL type
        """

        BLOB = memoryview
        INTEGER = int
        NULL = None
        REAL = float
        TEXT = str


    def __init__(self, **kwargs):
        """ Constructor

        Raises
        ------
        TypeError
            when one of the parameters type is not correct
        """

        # ----------------------------------------
        #   Variables
        # ----------------------------------------

        # Database instance
        self.__database = None
        self.__database_cursor = None

        # ----------------------------------------
        #   Database filepath
        # ----------------------------------------

        self.path = None

        if "path" in kwargs:
            self.path = kwargs["path"]

        if self.path is None:
            raise TypeError("Wrong type for path, expected pathlib.Path")

        # ----------------------------------------
        #   Database scheme
        # ----------------------------------------

        self.scheme = None

        if "scheme" in kwargs:
            self.scheme = Configuration(str(kwargs["scheme"]))

        if self.scheme is None:
            raise TypeError("Wrong type for scheme, expected pathlib.Path")

        # ----------------------------------------
        #   Check data
        # ----------------------------------------

        # Database file not exists and we cannot write in his folder
        if not self.path.exists() and not access(self.path.parent, W_OK):
            raise PermissionError("Cannot write into %s" % self.path.parent)


    def __del__(self):
        """ Destructor
        """

        if self.__database_cursor is not None:
            self.__database_cursor.close()


    def __parse_conditions(self, request, data):
        """ Conditions parser for SQL request

        Parameters
        ----------
        request : str
            SQL request which going to retrieve the condition
        data : list
            Conditions list

        Returns
        -------
        str
            New SQL request
        """

        conditions = list()

        for condition in data:

            if type(condition) is tuple or type(condition) is list:
                conditions.append(' '.join([
                    condition[0], condition[1].value, "\"%s\"" % condition[2]]))

            else:
                conditions.append(condition)

        if len(conditions) > 0:
            request += " WHERE %s" % " AND ".join(conditions)

        return request


    def __execute(self, request):
        """ Execute a SQL request

        Parameters
        ----------
        request : str
            SQL request
        """

        self.__database_cursor = self.__database.cursor()
        self.__database_cursor.execute("%s;" % request)

        # Save changes into database
        self.__database.commit()

        self.__database_cursor.close()


    def __query(self, request):
        """ Query a SQL request

        Parameters
        ----------
        request : str
            SQL request

        Returns
        -------
        list
            Request results
        """

        self.__database_cursor = self.__database.cursor()
        self.__database_cursor.execute("%s;" % request)

        # Retrieve results
        results = self.__database_cursor.fetchall()

        self.__database_cursor.close()

        return results


    def connect(self):
        """ Start the connection with the database
        """

        self.__database = sqlite3.connect(str(self.path))


    def check_integrity(self):
        """ Check database integrity with specified scheme

        This function check availabale tables and columns to detect missing
        contents

        Returns
        -------
        bool
            True if database integrity is good, False otherwise
        """

        tables = self.select(Database.Metadata.MASTER.value,
            "name", where=[("type", Database.Condition.EQUAL, "table")])

        # Check columns between database content and database scheme
        for table in self.scheme.sections():
            if not table in tables:
                return False

        # Check table columns
        for table in tables:
            if not self.pragma(table) == self.scheme.options(table):
                return False

        return True


    def generate_database(self):
        """ Generate database tables from specified scheme
        """

        tables = self.select(Database.Metadata.MASTER.value,
            "name", where=[("type", Database.Condition.EQUAL, "table")])

        for table in list(set(self.scheme.sections()) - set(tables)):
            data = dict()

            for key, value in self.scheme.items(table):
                data[key] = value

            self.create(table, data=data)


    def create(self, table, **kwargs):
        """ Create a new table into database

        Parameters
        ----------
        data : dict, optional
            Table columns

        Examples
        --------
        >>> db.create("accounts", data={"name": Database.Type.TEXT})
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        request = "CREATE TABLE %s" % table

        # Manage data
        if "data" in kwargs and type(kwargs["data"]) is dict:
            columns = list()

            for item in kwargs["data"].items():
                columns.append(" ".join(item))

            if len(columns) > 0:
                request += " (%s)" % ", ".join(columns)

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute(request)


    def drop(self, table):
        """ Drop a specific table

        Parameters
        ----------
        table : str
            Table name

        Examples
        --------
        >>> db.drop("accounts")
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute("DROP TABLE IF EXISTS %s" % table)


    def alter(self, table, **kwargs):
        """ Modify a specific table

        Parameters
        ----------
        table : str
            Table name
        rename : str, optional
            New table name
        add : tuple, optional
            New column

        Examples
        --------
        >>> db.alter("accounts", rename="characters")
        >>> db.alter("accounts", add=("year", int))
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        request = "ALTER TABLE %s" % table

        # Manage table renaming
        if "rename" in kwargs and type(kwargs["rename"]) is str:
            request += " RENAME TO %s" % kwargs["rename"]

        # Manage column adding
        if "add" in kwargs:

            if type(kwargs["add"]) is tuple or type(kwargs["add"]) is list:
                key, value = kwargs["add"][:2]

                request += " ADD COLUMN %s %s" % (key, Type(value).name)

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute(request)


    def pragma(self, table):
        """ Retrieve informations from a specific table

        Parameters
        ----------
        table : str
            Table name

        Returns
        -------
        list
            Table informations
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        result = list()

        for item in self.__query("PRAGMA table_info(%s)" % table):
            result.append(item[1])

        return result


    def select(self, table, columns, **kwargs):
        """ Retrieve data from a specific table

        Parameters
        ----------
        table : str
            Table name
        columns : list
            Columns to retriev
        where : list, optional
            Request conditions as tuples list

        Returns
        -------
        list
            Request results

        Examples
        --------
        >>> db.select("accounts", ["job"],
                where=[("name", Database.Condition.LIKE, "faythe")])
        {'job': 'trusted_advisor'}
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        if type(columns) is str:
            columns = columns.split()

        request = "SELECT %s FROM %s" % (", ".join(columns), table)

        # Manage conditions
        if "where" in kwargs and type(kwargs["where"]) is list:
            request = self.__parse_conditions(request, kwargs["where"])

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        results = self.__query(request)

        if len(columns) == 1 and not '*' in columns:
            results = [index[0] for index in results]

        return results


    def insert(self, table, **kwargs):
        """ Insert a new entry into a specific table

        Parameters
        ----------
        table : str
            Table name
        data : dict
            Entry data

        Examples
        --------
        >>> db.insert("accounts", data={"name": "oscar", "job": "opponent"})
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        request = "INSERT INTO %s" % table

        # Manage data
        if "data" in kwargs and type(kwargs["data"]) is dict:
            keys = list()
            values = list()

            for key, value in kwargs["data"].items():

                if type(value) is str:
                    value = "\"%s\"" % value

                keys.append(key)
                values.append(value)

            if len(keys) > 0 and len(values) > 0 and len(keys) == len(values):
                request += " (%s) VALUES (%s)" % (
                    ", ".join(keys), ", ".join(values))

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute(request)


    def update(self, table, **kwargs):
        """ Update an entry from a specific table

        Parameters
        ----------
        table : str
            Table name
        data : dict
            Entry data
        where : list
            Request conditions as tuples list

        Examples
        --------
        >>> db.update("accounts", data={"name": "eve", "job": "eavesdropper"},
                where=[("name", Database.Condition.EQUAL, "alice")])
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        request = "UPDATE %s" % table

        # Manage data
        if "data" in kwargs and type(kwargs["data"]) is dict:
            columns = list()

            for key, value in kwargs["data"].items():

                if type(value) is str:
                    value = "\"%s\"" % value

                columns.append(" = ".join((key, value)))

            if len(columns) > 0:
                request += " SET %s" % ", ".join(columns)

        # Manage conditions
        if "where" in kwargs and type(kwargs["where"]) is list:
            request = self.__parse_conditions(request, kwargs["where"])

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute(request)


    def delete(self, table, **kwargs):
        """ Delete an entry from a specific table

        Parameters
        ----------
        table : str
            Table name
        where : list
            Request conditions as tuples list

        Examples
        --------
        >>> db.delete("accounts",
                where=[("name", Database.Condition.EQUAL, "bob")])
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        request = "DELETE FROM %s" % table

        # Manage conditions
        if "where" in kwargs and type(kwargs["where"]) is list:
            request = self.__parse_conditions(request, kwargs["where"])

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute(request)

Classes

class Database (**kwargs)

Constructor

Raises

TypeError
when one of the parameters type is not correct
Expand source code
class Database(object):

    class Metadata(Enum):
        """ Represent metadata
        """

        MASTER = "sqlite_master"


    class Condition(Enum):
        """ Represent SQL conditions
        """

        IN = "IN"
        LIKE = "LIKE"
        BETWEEN = "BETWEEN"

        LESS = "<"
        LESSEQUAL = "<="
        GREATER = ">"
        GREATEREQUAL = ">="

        EQUAL = "="
        NOTEQUAL = "<>"


    class Type(Enum):
        """ Represent SQL type
        """

        BLOB = memoryview
        INTEGER = int
        NULL = None
        REAL = float
        TEXT = str


    def __init__(self, **kwargs):
        """ Constructor

        Raises
        ------
        TypeError
            when one of the parameters type is not correct
        """

        # ----------------------------------------
        #   Variables
        # ----------------------------------------

        # Database instance
        self.__database = None
        self.__database_cursor = None

        # ----------------------------------------
        #   Database filepath
        # ----------------------------------------

        self.path = None

        if "path" in kwargs:
            self.path = kwargs["path"]

        if self.path is None:
            raise TypeError("Wrong type for path, expected pathlib.Path")

        # ----------------------------------------
        #   Database scheme
        # ----------------------------------------

        self.scheme = None

        if "scheme" in kwargs:
            self.scheme = Configuration(str(kwargs["scheme"]))

        if self.scheme is None:
            raise TypeError("Wrong type for scheme, expected pathlib.Path")

        # ----------------------------------------
        #   Check data
        # ----------------------------------------

        # Database file not exists and we cannot write in his folder
        if not self.path.exists() and not access(self.path.parent, W_OK):
            raise PermissionError("Cannot write into %s" % self.path.parent)


    def __del__(self):
        """ Destructor
        """

        if self.__database_cursor is not None:
            self.__database_cursor.close()


    def __parse_conditions(self, request, data):
        """ Conditions parser for SQL request

        Parameters
        ----------
        request : str
            SQL request which going to retrieve the condition
        data : list
            Conditions list

        Returns
        -------
        str
            New SQL request
        """

        conditions = list()

        for condition in data:

            if type(condition) is tuple or type(condition) is list:
                conditions.append(' '.join([
                    condition[0], condition[1].value, "\"%s\"" % condition[2]]))

            else:
                conditions.append(condition)

        if len(conditions) > 0:
            request += " WHERE %s" % " AND ".join(conditions)

        return request


    def __execute(self, request):
        """ Execute a SQL request

        Parameters
        ----------
        request : str
            SQL request
        """

        self.__database_cursor = self.__database.cursor()
        self.__database_cursor.execute("%s;" % request)

        # Save changes into database
        self.__database.commit()

        self.__database_cursor.close()


    def __query(self, request):
        """ Query a SQL request

        Parameters
        ----------
        request : str
            SQL request

        Returns
        -------
        list
            Request results
        """

        self.__database_cursor = self.__database.cursor()
        self.__database_cursor.execute("%s;" % request)

        # Retrieve results
        results = self.__database_cursor.fetchall()

        self.__database_cursor.close()

        return results


    def connect(self):
        """ Start the connection with the database
        """

        self.__database = sqlite3.connect(str(self.path))


    def check_integrity(self):
        """ Check database integrity with specified scheme

        This function check availabale tables and columns to detect missing
        contents

        Returns
        -------
        bool
            True if database integrity is good, False otherwise
        """

        tables = self.select(Database.Metadata.MASTER.value,
            "name", where=[("type", Database.Condition.EQUAL, "table")])

        # Check columns between database content and database scheme
        for table in self.scheme.sections():
            if not table in tables:
                return False

        # Check table columns
        for table in tables:
            if not self.pragma(table) == self.scheme.options(table):
                return False

        return True


    def generate_database(self):
        """ Generate database tables from specified scheme
        """

        tables = self.select(Database.Metadata.MASTER.value,
            "name", where=[("type", Database.Condition.EQUAL, "table")])

        for table in list(set(self.scheme.sections()) - set(tables)):
            data = dict()

            for key, value in self.scheme.items(table):
                data[key] = value

            self.create(table, data=data)


    def create(self, table, **kwargs):
        """ Create a new table into database

        Parameters
        ----------
        data : dict, optional
            Table columns

        Examples
        --------
        >>> db.create("accounts", data={"name": Database.Type.TEXT})
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        request = "CREATE TABLE %s" % table

        # Manage data
        if "data" in kwargs and type(kwargs["data"]) is dict:
            columns = list()

            for item in kwargs["data"].items():
                columns.append(" ".join(item))

            if len(columns) > 0:
                request += " (%s)" % ", ".join(columns)

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute(request)


    def drop(self, table):
        """ Drop a specific table

        Parameters
        ----------
        table : str
            Table name

        Examples
        --------
        >>> db.drop("accounts")
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute("DROP TABLE IF EXISTS %s" % table)


    def alter(self, table, **kwargs):
        """ Modify a specific table

        Parameters
        ----------
        table : str
            Table name
        rename : str, optional
            New table name
        add : tuple, optional
            New column

        Examples
        --------
        >>> db.alter("accounts", rename="characters")
        >>> db.alter("accounts", add=("year", int))
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        request = "ALTER TABLE %s" % table

        # Manage table renaming
        if "rename" in kwargs and type(kwargs["rename"]) is str:
            request += " RENAME TO %s" % kwargs["rename"]

        # Manage column adding
        if "add" in kwargs:

            if type(kwargs["add"]) is tuple or type(kwargs["add"]) is list:
                key, value = kwargs["add"][:2]

                request += " ADD COLUMN %s %s" % (key, Type(value).name)

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute(request)


    def pragma(self, table):
        """ Retrieve informations from a specific table

        Parameters
        ----------
        table : str
            Table name

        Returns
        -------
        list
            Table informations
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        result = list()

        for item in self.__query("PRAGMA table_info(%s)" % table):
            result.append(item[1])

        return result


    def select(self, table, columns, **kwargs):
        """ Retrieve data from a specific table

        Parameters
        ----------
        table : str
            Table name
        columns : list
            Columns to retriev
        where : list, optional
            Request conditions as tuples list

        Returns
        -------
        list
            Request results

        Examples
        --------
        >>> db.select("accounts", ["job"],
                where=[("name", Database.Condition.LIKE, "faythe")])
        {'job': 'trusted_advisor'}
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        if type(columns) is str:
            columns = columns.split()

        request = "SELECT %s FROM %s" % (", ".join(columns), table)

        # Manage conditions
        if "where" in kwargs and type(kwargs["where"]) is list:
            request = self.__parse_conditions(request, kwargs["where"])

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        results = self.__query(request)

        if len(columns) == 1 and not '*' in columns:
            results = [index[0] for index in results]

        return results


    def insert(self, table, **kwargs):
        """ Insert a new entry into a specific table

        Parameters
        ----------
        table : str
            Table name
        data : dict
            Entry data

        Examples
        --------
        >>> db.insert("accounts", data={"name": "oscar", "job": "opponent"})
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        request = "INSERT INTO %s" % table

        # Manage data
        if "data" in kwargs and type(kwargs["data"]) is dict:
            keys = list()
            values = list()

            for key, value in kwargs["data"].items():

                if type(value) is str:
                    value = "\"%s\"" % value

                keys.append(key)
                values.append(value)

            if len(keys) > 0 and len(values) > 0 and len(keys) == len(values):
                request += " (%s) VALUES (%s)" % (
                    ", ".join(keys), ", ".join(values))

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute(request)


    def update(self, table, **kwargs):
        """ Update an entry from a specific table

        Parameters
        ----------
        table : str
            Table name
        data : dict
            Entry data
        where : list
            Request conditions as tuples list

        Examples
        --------
        >>> db.update("accounts", data={"name": "eve", "job": "eavesdropper"},
                where=[("name", Database.Condition.EQUAL, "alice")])
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        request = "UPDATE %s" % table

        # Manage data
        if "data" in kwargs and type(kwargs["data"]) is dict:
            columns = list()

            for key, value in kwargs["data"].items():

                if type(value) is str:
                    value = "\"%s\"" % value

                columns.append(" = ".join((key, value)))

            if len(columns) > 0:
                request += " SET %s" % ", ".join(columns)

        # Manage conditions
        if "where" in kwargs and type(kwargs["where"]) is list:
            request = self.__parse_conditions(request, kwargs["where"])

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute(request)


    def delete(self, table, **kwargs):
        """ Delete an entry from a specific table

        Parameters
        ----------
        table : str
            Table name
        where : list
            Request conditions as tuples list

        Examples
        --------
        >>> db.delete("accounts",
                where=[("name", Database.Condition.EQUAL, "bob")])
        """

        if self.__database is None:
            raise RuntimeError("Database has not been loaded")

        # ----------------------------------------
        #   Generate request
        # ----------------------------------------

        request = "DELETE FROM %s" % table

        # Manage conditions
        if "where" in kwargs and type(kwargs["where"]) is list:
            request = self.__parse_conditions(request, kwargs["where"])

        # ----------------------------------------
        #   Start request
        # ----------------------------------------

        self.__execute(request)

Class variables

var Condition

Represent SQL conditions

Expand source code
class Condition(Enum):
    """ Represent SQL conditions
    """

    IN = "IN"
    LIKE = "LIKE"
    BETWEEN = "BETWEEN"

    LESS = "<"
    LESSEQUAL = "<="
    GREATER = ">"
    GREATEREQUAL = ">="

    EQUAL = "="
    NOTEQUAL = "<>"
var Metadata

Represent metadata

Expand source code
class Metadata(Enum):
    """ Represent metadata
    """

    MASTER = "sqlite_master"
var Type

Represent SQL type

Expand source code
class Type(Enum):
    """ Represent SQL type
    """

    BLOB = memoryview
    INTEGER = int
    NULL = None
    REAL = float
    TEXT = str

Methods

def alter(self, table, **kwargs)

Modify a specific table

Parameters

table : str
Table name
rename : str, optional
New table name
add : tuple, optional
New column

Examples

>>> db.alter("accounts", rename="characters")
>>> db.alter("accounts", add=("year", int))
Expand source code
def alter(self, table, **kwargs):
    """ Modify a specific table

    Parameters
    ----------
    table : str
        Table name
    rename : str, optional
        New table name
    add : tuple, optional
        New column

    Examples
    --------
    >>> db.alter("accounts", rename="characters")
    >>> db.alter("accounts", add=("year", int))
    """

    if self.__database is None:
        raise RuntimeError("Database has not been loaded")

    # ----------------------------------------
    #   Generate request
    # ----------------------------------------

    request = "ALTER TABLE %s" % table

    # Manage table renaming
    if "rename" in kwargs and type(kwargs["rename"]) is str:
        request += " RENAME TO %s" % kwargs["rename"]

    # Manage column adding
    if "add" in kwargs:

        if type(kwargs["add"]) is tuple or type(kwargs["add"]) is list:
            key, value = kwargs["add"][:2]

            request += " ADD COLUMN %s %s" % (key, Type(value).name)

    # ----------------------------------------
    #   Start request
    # ----------------------------------------

    self.__execute(request)
def check_integrity(self)

Check database integrity with specified scheme

This function check availabale tables and columns to detect missing contents

Returns

bool
True if database integrity is good, False otherwise
Expand source code
def check_integrity(self):
    """ Check database integrity with specified scheme

    This function check availabale tables and columns to detect missing
    contents

    Returns
    -------
    bool
        True if database integrity is good, False otherwise
    """

    tables = self.select(Database.Metadata.MASTER.value,
        "name", where=[("type", Database.Condition.EQUAL, "table")])

    # Check columns between database content and database scheme
    for table in self.scheme.sections():
        if not table in tables:
            return False

    # Check table columns
    for table in tables:
        if not self.pragma(table) == self.scheme.options(table):
            return False

    return True
def connect(self)

Start the connection with the database

Expand source code
def connect(self):
    """ Start the connection with the database
    """

    self.__database = sqlite3.connect(str(self.path))
def create(self, table, **kwargs)

Create a new table into database

Parameters

data : dict, optional
Table columns

Examples

>>> db.create("accounts", data={"name": Database.Type.TEXT})
Expand source code
def create(self, table, **kwargs):
    """ Create a new table into database

    Parameters
    ----------
    data : dict, optional
        Table columns

    Examples
    --------
    >>> db.create("accounts", data={"name": Database.Type.TEXT})
    """

    if self.__database is None:
        raise RuntimeError("Database has not been loaded")

    # ----------------------------------------
    #   Generate request
    # ----------------------------------------

    request = "CREATE TABLE %s" % table

    # Manage data
    if "data" in kwargs and type(kwargs["data"]) is dict:
        columns = list()

        for item in kwargs["data"].items():
            columns.append(" ".join(item))

        if len(columns) > 0:
            request += " (%s)" % ", ".join(columns)

    # ----------------------------------------
    #   Start request
    # ----------------------------------------

    self.__execute(request)
def delete(self, table, **kwargs)

Delete an entry from a specific table

Parameters

table : str
Table name
where : list
Request conditions as tuples list

Examples

>>> db.delete("accounts",
        where=[("name", Database.Condition.EQUAL, "bob")])
Expand source code
def delete(self, table, **kwargs):
    """ Delete an entry from a specific table

    Parameters
    ----------
    table : str
        Table name
    where : list
        Request conditions as tuples list

    Examples
    --------
    >>> db.delete("accounts",
            where=[("name", Database.Condition.EQUAL, "bob")])
    """

    if self.__database is None:
        raise RuntimeError("Database has not been loaded")

    # ----------------------------------------
    #   Generate request
    # ----------------------------------------

    request = "DELETE FROM %s" % table

    # Manage conditions
    if "where" in kwargs and type(kwargs["where"]) is list:
        request = self.__parse_conditions(request, kwargs["where"])

    # ----------------------------------------
    #   Start request
    # ----------------------------------------

    self.__execute(request)
def drop(self, table)

Drop a specific table

Parameters

table : str
Table name

Examples

>>> db.drop("accounts")
Expand source code
def drop(self, table):
    """ Drop a specific table

    Parameters
    ----------
    table : str
        Table name

    Examples
    --------
    >>> db.drop("accounts")
    """

    if self.__database is None:
        raise RuntimeError("Database has not been loaded")

    # ----------------------------------------
    #   Start request
    # ----------------------------------------

    self.__execute("DROP TABLE IF EXISTS %s" % table)
def generate_database(self)

Generate database tables from specified scheme

Expand source code
def generate_database(self):
    """ Generate database tables from specified scheme
    """

    tables = self.select(Database.Metadata.MASTER.value,
        "name", where=[("type", Database.Condition.EQUAL, "table")])

    for table in list(set(self.scheme.sections()) - set(tables)):
        data = dict()

        for key, value in self.scheme.items(table):
            data[key] = value

        self.create(table, data=data)
def insert(self, table, **kwargs)

Insert a new entry into a specific table

Parameters

table : str
Table name
data : dict
Entry data

Examples

>>> db.insert("accounts", data={"name": "oscar", "job": "opponent"})
Expand source code
def insert(self, table, **kwargs):
    """ Insert a new entry into a specific table

    Parameters
    ----------
    table : str
        Table name
    data : dict
        Entry data

    Examples
    --------
    >>> db.insert("accounts", data={"name": "oscar", "job": "opponent"})
    """

    if self.__database is None:
        raise RuntimeError("Database has not been loaded")

    # ----------------------------------------
    #   Generate request
    # ----------------------------------------

    request = "INSERT INTO %s" % table

    # Manage data
    if "data" in kwargs and type(kwargs["data"]) is dict:
        keys = list()
        values = list()

        for key, value in kwargs["data"].items():

            if type(value) is str:
                value = "\"%s\"" % value

            keys.append(key)
            values.append(value)

        if len(keys) > 0 and len(values) > 0 and len(keys) == len(values):
            request += " (%s) VALUES (%s)" % (
                ", ".join(keys), ", ".join(values))

    # ----------------------------------------
    #   Start request
    # ----------------------------------------

    self.__execute(request)
def pragma(self, table)

Retrieve informations from a specific table

Parameters

table : str
Table name

Returns

list
Table informations
Expand source code
def pragma(self, table):
    """ Retrieve informations from a specific table

    Parameters
    ----------
    table : str
        Table name

    Returns
    -------
    list
        Table informations
    """

    if self.__database is None:
        raise RuntimeError("Database has not been loaded")

    # ----------------------------------------
    #   Start request
    # ----------------------------------------

    result = list()

    for item in self.__query("PRAGMA table_info(%s)" % table):
        result.append(item[1])

    return result
def select(self, table, columns, **kwargs)

Retrieve data from a specific table

Parameters

table : str
Table name
columns : list
Columns to retriev
where : list, optional
Request conditions as tuples list

Returns

list
Request results

Examples

>>> db.select("accounts", ["job"],
        where=[("name", Database.Condition.LIKE, "faythe")])

{'job': 'trusted_advisor'}

Expand source code
def select(self, table, columns, **kwargs):
    """ Retrieve data from a specific table

    Parameters
    ----------
    table : str
        Table name
    columns : list
        Columns to retriev
    where : list, optional
        Request conditions as tuples list

    Returns
    -------
    list
        Request results

    Examples
    --------
    >>> db.select("accounts", ["job"],
            where=[("name", Database.Condition.LIKE, "faythe")])
    {'job': 'trusted_advisor'}
    """

    if self.__database is None:
        raise RuntimeError("Database has not been loaded")

    # ----------------------------------------
    #   Generate request
    # ----------------------------------------

    if type(columns) is str:
        columns = columns.split()

    request = "SELECT %s FROM %s" % (", ".join(columns), table)

    # Manage conditions
    if "where" in kwargs and type(kwargs["where"]) is list:
        request = self.__parse_conditions(request, kwargs["where"])

    # ----------------------------------------
    #   Start request
    # ----------------------------------------

    results = self.__query(request)

    if len(columns) == 1 and not '*' in columns:
        results = [index[0] for index in results]

    return results
def update(self, table, **kwargs)

Update an entry from a specific table

Parameters

table : str
Table name
data : dict
Entry data
where : list
Request conditions as tuples list

Examples

>>> db.update("accounts", data={"name": "eve", "job": "eavesdropper"},
        where=[("name", Database.Condition.EQUAL, "alice")])
Expand source code
def update(self, table, **kwargs):
    """ Update an entry from a specific table

    Parameters
    ----------
    table : str
        Table name
    data : dict
        Entry data
    where : list
        Request conditions as tuples list

    Examples
    --------
    >>> db.update("accounts", data={"name": "eve", "job": "eavesdropper"},
            where=[("name", Database.Condition.EQUAL, "alice")])
    """

    if self.__database is None:
        raise RuntimeError("Database has not been loaded")

    # ----------------------------------------
    #   Generate request
    # ----------------------------------------

    request = "UPDATE %s" % table

    # Manage data
    if "data" in kwargs and type(kwargs["data"]) is dict:
        columns = list()

        for key, value in kwargs["data"].items():

            if type(value) is str:
                value = "\"%s\"" % value

            columns.append(" = ".join((key, value)))

        if len(columns) > 0:
            request += " SET %s" % ", ".join(columns)

    # Manage conditions
    if "where" in kwargs and type(kwargs["where"]) is list:
        request = self.__parse_conditions(request, kwargs["where"])

    # ----------------------------------------
    #   Start request
    # ----------------------------------------

    self.__execute(request)