Source code for alchy.manager

"""Manager class and mixin.

The :class:`Manager` class helps manage a SQLAlchemy database session as well
as provide convenience functions for commons operations.


Configuration
-------------

The following configuration values can be passed into a new :class:`Manager`
instance as a ``dict``, ``class``, or ``module``.

===========================  ==================================================
``SQLALCHEMY_DATABASE_URI``  URI used to connect to the database. Defaults to
                             ``sqlite://``.
``SQLALCHEMY_BINDS``         A ``dict`` that maps bind keys to database URIs.
                             Optionally, in place of a database URI, a
                             configuration ``dict`` can be used to overrided
                             connection options.
``SQLALCHEMY_ECHO``          When ``True`` have SQLAlchemy echo all SQL
                             statements. Defaults to ``False``.
``SQLALCHEMY_POOL_SIZE``     The size of the database pool. Defaults to the
                             engine's default (usually ``5``).
``SQLALCHEMY_POOL_TIMEOUT``  Specifies the connection timeout for the pool.
                             Defaults to ``10``.
``SQLALCHEMY_POOL_RECYCLE``  Number of seconds after which a connection is
                             automatically recycled.
``SQLALCHEMY_MAX_OVERFLOW``  Controls the number of connections that can be
                             created after the pool reached its maximum size.
                             When those additional connections are returned to
                             the pool, they are disconnected and discarded.
===========================  ==================================================
"""

from functools import partial

import sqlalchemy
from sqlalchemy import orm
from sqlalchemy.engine.url import make_url
from sqlalchemy.orm.exc import UnmappedError

from .model import make_declarative_base, extend_declarative_base
from .query import QueryModel
from .session import Session
from ._compat import string_types, itervalues


__all__ = [
    'ManagerMixin',
    'Manager',
    'Config',
]


[docs]class ManagerMixin(object): """Extensions for :attr:`Manager.session`."""
[docs] def add(self, *instances): """Override ``session.add()`` so it can function like ``session.add_all()``. Note: Supports chaining. """ for instance in instances: if isinstance(instance, list): self.add(*instance) else: self.session.add(instance) return self.session
[docs] def add_commit(self, *instances): """Add instances to session and commit in one call.""" self.add(*instances).commit()
[docs] def delete(self, *instances): """Override ``session.delete()`` so it can function like ``session.add_all()``. Note: Supports chaining. """ for instance in instances: if isinstance(instance, list): self.delete(*instance) else: self.session.delete(instance) return self.session
[docs] def delete_commit(self, *instances): """Delete instances to session and commit in one call.""" self.delete(*instances).commit()
[docs]class Manager(ManagerMixin): """Manager class for database session. Initialization of :class:`Manager` accepts a config object, session options, and an optional declarative base. If ``Model`` isn't provided, then a default one is generated using :func:`alchy.model.make_declarative_base`. The declarative base model is accessible at :attr:`Model`. By default the ``session_options`` are:: { 'query_cls': alchy.Query, 'autocommit': False, 'autoflush': True } The default :attr:`session_class` is :class:`alchy.Session`. If you want to provide your own session class, then it's suggested that you subclass :class:`alchy.Session` and pass it in via :attr:`session_class`. This way your subclass will inherit the functionality of :class:`alchy.Session`. """ def __init__(self, config=None, session_options=None, Model=None, session_class=None): #: Database engine configuration options. self.config = Config(defaults={ 'SQLALCHEMY_DATABASE_URI': 'sqlite://', 'SQLALCHEMY_BINDS': None, 'SQLALCHEMY_ECHO': False, 'SQLALCHEMY_POOL_SIZE': None, 'SQLALCHEMY_POOL_TIMEOUT': None, 'SQLALCHEMY_POOL_RECYCLE': None, 'SQLALCHEMY_MAX_OVERFLOW': None }) if isinstance(config, dict): self.config.update(config) elif config is not None: self.config.from_object(config) self._engines = {} self._binds = {} if session_options is None: session_options = {} session_options.setdefault('query_cls', QueryModel) session_options.setdefault('autocommit', False) session_options.setdefault('autoflush', True) #: Class to used for session object. self.session_class = session_class or Session #: Scoped session object. self.session = self.create_scoped_session(session_options) if Model is None: #: Declarative base model class. self.Model = make_declarative_base() else: self.Model = Model if self.Model: extend_declarative_base(self.Model, self.session) @property def metadata(self): """Return :attr:`Model` metadata object.""" return getattr(self.Model, 'metadata', None) @property def binds(self): """Returns config options for all binds.""" if not self._binds: self._binds = { None: self.config['SQLALCHEMY_DATABASE_URI'] } if self.config['SQLALCHEMY_BINDS']: self._binds.update(self.config['SQLALCHEMY_BINDS']) return self._binds @property def binds_map(self): """Returns a dictionary with a table->engine mapping. This is suitable for use in ``sessionmaker(binds=binds_map)``. """ binds = list(self.binds) retval = {} for bind in binds: engine = self.get_engine(bind) tables = self.get_tables_for_bind(bind) retval.update(dict((table, engine) for table in tables)) return retval @property def engine(self): """Return default database engine.""" return self.get_engine()
[docs] def create_engine(self, uri_or_config): """Create engine using either a URI or a config dict. If URI supplied, then the default :attr:`config` will be used. If config supplied, then URI in config will be used. """ if isinstance(uri_or_config, dict): uri = uri_or_config['SQLALCHEMY_DATABASE_URI'] config = uri_or_config else: uri = uri_or_config config = self.config options = engine_options_from_config(config) return sqlalchemy.create_engine(make_url(uri), **options)
[docs] def get_engine(self, bind=None): """Return engine associated with bind. Create engine if it doesn't already exist. """ if bind not in self._engines: assert bind in self.binds, ( 'Bind {0} is not specified. ' 'Set in SQLALCHEMY_BINDS configuration variable'.format(bind)) self._engines[bind] = self.create_engine(self.binds[bind]) return self._engines[bind]
[docs] def create_scoped_session(self, options=None): """Create scoped session which internally calls :meth:`create_session`. """ if options is None: # pragma: no cover options = {} return orm.scoped_session(partial(self.create_session, options))
[docs] def create_session(self, options): """Create session instance using custom Session class that supports multiple bindings. """ return self.session_class(self, **options)
[docs] def get_tables_for_bind(self, bind=None): """Returns a list of all tables relevant for a bind.""" return [table for table in itervalues(self.metadata.tables) if table.info.get('bind_key') == bind]
def _execute_for_all_tables(self, bind, operation, skip_tables=False): """Execute metadata operation for associated tables.""" if self.metadata is None: raise UnmappedError('Missing declarative base model') if bind == '__all__': binds = [None] + list(self.config.get('SQLALCHEMY_BINDS') or {}) elif isinstance(bind, string_types) or bind is None: binds = [bind] else: binds = bind for bind in binds: extra = {} if not skip_tables: tables = self.get_tables_for_bind(bind) extra['tables'] = tables metadata_operation = getattr(self.metadata, operation) metadata_operation(bind=self.get_engine(bind), **extra)
[docs] def create_all(self, bind='__all__'): """Create database schema from models.""" self._execute_for_all_tables(bind, 'create_all')
[docs] def drop_all(self, bind='__all__'): """Drop tables defined by models.""" self._execute_for_all_tables(bind, 'drop_all')
[docs] def reflect(self, bind='__all__'): """Reflect tables from database.""" self._execute_for_all_tables(bind, 'reflect', skip_tables=True)
[docs] def __getattr__(self, attr): """Delegate all other attributes to :attr:`session`.""" return getattr(self.session, attr)
[docs]class Config(dict): """Configuration loader which acts like a dict but supports loading values from an object limited to ``ALL_CAPS_ATTRIBUTES``. """ def __init__(self, defaults=None): super(Config, self).__init__(defaults or {})
[docs] def from_object(self, obj): """Pull ``dir(obj)`` keys from `obj` and set onto ``self``.""" for key in dir(obj): if key.isupper(): self[key] = getattr(obj, key)
def engine_options_from_config(config): """Return engine options derived from config object.""" options = {} def _setdefault(optionkey, configkey): """Set options key if config key is not None.""" if config.get(configkey) is not None: options[optionkey] = config[configkey] _setdefault('echo', 'SQLALCHEMY_ECHO') _setdefault('pool_size', 'SQLALCHEMY_POOL_SIZE') _setdefault('pool_timeout', 'SQLALCHEMY_POOL_TIMEOUT') _setdefault('pool_recycle', 'SQLALCHEMY_POOL_RECYCLE') _setdefault('max_overflow', 'SQLALCHEMY_MAX_OVERFLOW') return options