Source code for alchy.query

"""Query subclass used by Manager as default session query class.
"""

from math import ceil

from sqlalchemy import orm, and_, or_, inspect
from sqlalchemy.orm.strategy_options import Load
from pydash import py_

from ._compat import iteritems


__all__ = [
    'Query',
    'QueryModel',
    'QueryProperty',
    'Pagination',
    'LoadOption'
]


[docs]class Query(orm.Query): """Extension of default Query class used in SQLAlchemy session queries. """ #: Default per_page argument for pagination when per_page not specified. DEFAULT_PER_PAGE = 50 @property def entities(self): """Return list of entity classes present in query.""" return [e.mapper.class_ for e in self._entities] @property def join_entities(self): """Return list of the joined entity classes present in query.""" return [e.mapper.class_ for e in self._join_entities] @property def all_entities(self): """Return list of entities + join_entities present in query.""" return self.entities + self.join_entities def _join_eager(self, keys, use_outerjoin, **kargs): """Helper method for applying ``join()``/``outerjoin()` with ``contains_eager()``. """ alias = kargs.pop('alias', {}) options = kargs.pop('options', None) if not isinstance(alias, dict): alias = {keys[0]: alias} join_args = [(alias.get(key), key) for key in keys] load = orm.contains_eager(keys[0], alias=alias.get(keys[0])) for key in keys[1:]: load = load.contains_eager(key, alias=alias.get(key)) if options: apply_load_options(load, options) join = self.outerjoin if use_outerjoin else self.join return join(*join_args).options(load)
[docs] def join_eager(self, *keys, **kargs): """Apply ``join`` + ``self.options(contains_eager())``. Args: keys (mixed): Either string or column references to join path(s). Keyword Args: alias: Join alias or ``dict`` mapping key names to aliases. options (list): A list of :class:`LoadOption` to apply to the overall load strategy, i.e., each :class:`LoadOption` will be chained at the end of the load. """ return self._join_eager(keys, False, **kargs)
[docs] def outerjoin_eager(self, *keys, **kargs): """Apply ``outerjoin`` + ``self.options(contains_eager())``. Args: keys (mixed): Either string keys or column references to join path(s). Keyword Args: alias: Join alias or ``dict`` mapping key names to aliases. options (list): A list of :class:`LoadOption` to apply to the overall load strategy, i.e., each :class:`LoadOption` will be chained at the end of the load. """ return self._join_eager(keys, True, **kargs)
def _join_load(self, keys, load_strategy, **kargs): """Helper method for returning load strategies.""" options = kargs.pop('options', None) load = getattr(orm, load_strategy)(keys[0], **kargs) for key in keys[1:]: load = getattr(load, load_strategy)(key) if options: load = apply_load_options(load, options) return self.options(load)
[docs] def joinedload(self, *keys, **kargs): """Apply ``joinedload()`` to `keys`. Args: keys (mixed): Either string or column references to join path(s). Keyword Args: options (list): A list of :class:`LoadOption` to apply to the overall load strategy, i.e., each :class:`LoadOption` will be chained at the end of the load. Note: Additional keyword args will be passed to initial load creation. """ return self._join_load(keys, 'joinedload', **kargs)
[docs] def immediateload(self, *keys, **kargs): """Apply ``immediateload()`` to `keys`. Args: keys (mixed): Either string or column references to join path(s). Keyword Args: options (list): A list of :class:`LoadOption` to apply to the overall load strategy, i.e., each :class:`LoadOption` will be chained at the end of the load. Note: Additional keyword args will be passed to initial load creation. """ return self._join_load(keys, 'immediateload', **kargs)
[docs] def lazyload(self, *keys, **kargs): """Apply ``lazyload()`` to `keys`. Args: keys (mixed): Either string or column references to join path(s). Keyword Args: options (list): A list of :class:`LoadOption` to apply to the overall load strategy, i.e., each :class:`LoadOption` will be chained at the end of the load. Note: Additional keyword args will be passed to initial load creation. """ return self._join_load(keys, 'lazyload', **kargs)
[docs] def noload(self, *keys, **kargs): """Apply ``noload()`` to `keys`. Args: keys (mixed): Either string or column references to join path(s). Keyword Args: options (list): A list of :class:`LoadOption` to apply to the overall load strategy, i.e., each :class:`LoadOption` will be chained at the end of the load. Note: Additional keyword args will be passed to initial load creation. """ return self._join_load(keys, 'noload', **kargs)
[docs] def subqueryload(self, *keys, **kargs): """Apply ``subqueryload()`` to `keys`. Args: keys (mixed): Either string or column references to join path(s). Keyword Args: options (list): A list of :class:`LoadOption` to apply to the overall load strategy, i.e., each :class:`LoadOption` will be chained at the end of the load. Note: Additional keyword args will be passed to initial load creation. """ return self._join_load(keys, 'subqueryload', **kargs)
[docs] def load_only(self, *columns): """Apply ``load_only()`` to query.""" obj, columns = get_load_options(*columns) return self.options(obj.load_only(*columns))
[docs] def defer(self, *columns): """Apply ``defer()`` to query.""" load, columns = get_load_options(*columns) for column in columns: load = load.defer(column) return self.options(load)
[docs] def undefer(self, *columns): """Apply ``undefer()`` to query.""" load, columns = get_load_options(*columns) for column in columns: load = load.undefer(column) return self.options(load)
[docs] def undefer_group(self, *names): """Apply ``undefer_group()`` to query.""" obj, names = get_load_options(*names) return self.options(obj.undefer_group(names[0]))
[docs] def chain(self): """Return pydash chaining instance with items returned by :meth:`all`. See Also: `pydash's <http://pydash.readthedocs.org/>`_ documentation on `chaining <http://pydash.readthedocs.org/en/latest/chaining.html>`_ """ return py_.chain(self.all())
[docs] def index_by(self, callback=None): """Index items returned by :meth:`all` using `callback`.""" return py_.index_by(self.all(), callback)
[docs] def map(self, callback=None): """Map `callback` to each item returned by :meth:`all`.""" return py_.map(self.all(), callback)
[docs] def reduce(self, callback=None, initial=None): """Reduce :meth:`all` using `callback`.""" return py_.reduce(self.all(), callback, initial)
[docs] def reduce_right(self, callback=None, initial=None): """Reduce reversed :meth:`all` using `callback`.""" return py_.reduce_right(self.all(), callback, initial)
[docs] def pluck(self, column): """Pluck `column` attribute values from :meth:`all` results and return as list. """ return py_.pluck(self.all(), column)
[docs] def page(self, page=1, per_page=None): """Return query with limit and offset applied for page.""" if per_page is None: per_page = self.DEFAULT_PER_PAGE return self.limit(per_page).offset((page - 1) * per_page)
[docs] def paginate(self, page=1, per_page=None, error_out=True): """Return :class:`Pagination` instance using already defined query parameters. """ if error_out and page < 1: raise IndexError if per_page is None: per_page = self.DEFAULT_PER_PAGE items = self.page(page, per_page).all() if not items and page != 1 and error_out: raise IndexError # No need to count if we're on the first page and there are fewer items # than we expected. if page == 1 and len(items) < per_page: total = len(items) else: total = self.order_by(None).count() return Pagination(self, page, per_page, total, items)
[docs]class QueryModel(Query): """Class used for default query property class for ``mymanager.query``, ``mymanager.session.query``, and ``MyModel.query``. Can be used in other libraries/implementations when creating a session:: from sqlalchemy import orm from alchy import QueryModel # or if not using as query property # from alchy import Query session = orm.scoped_session(orm.sessionmaker()) session.configure(query_cls=QueryModel) **NOTE:** If you don't plan to use the query class as a query property, then you can use the :class:`Query` class instead since it won't include features that only work within a query property context. Attributes: __search_filters__: All available search filter functions indexed by a canonical name which will be referenced in advanced/simple search. All filter functions should take a single value and return an SQLAlchemy filter expression, i.e., ``{key: lambda value: Model.column_name.contains(value)}`` __advanced_search__: Advanced search models search by named parameters. Generally found on advanced search forms where each field maps to a specific database field that will be queried against. If defined as a list, each item should be a key from :attr:`__search_filters__`. The matching :attr:`__search_filters__` function will be used in the query. If defined as a dict, it should have the same format as :attr:`__search_filters__`. __simple_search__: Simple search models search by phrase (like Google search). Defined like :attr:`__advanced_search__`. __order_by__: Default order-by to use when :attr:`alchy.model.ModelBase.query` used. """ __search_filters__ = {} __advanced_search__ = [] __simple_search__ = [] __order_by__ = None @property def Model(self): """Return primary entity model class.""" return self.entities[0]
[docs] def get_search_filters(self, keys): """Return :attr:`__search_filters__` filtered by keys.""" if isinstance(keys, dict): return keys else: return dict([(key, self.__search_filters__[key]) for key in keys])
[docs] def advanced_filter(self, search_dict=None): """Return the compiled advanced search filter mapped to `search_dict`. """ if search_dict is None: # pragma: no cover search_dict = {} filter_funcs = self.get_search_filters(self.__advanced_search__) term_filters = [filter_funcs[key](value) for key, value in iteritems(search_dict) if key in filter_funcs] # All filters should match for an advanced search. return and_(*term_filters)
[docs] def simple_filter(self, search_terms=None): """Return the compiled simple search filter mapped to `search_terms`. """ if search_terms is None: # pragma: no cover search_terms = [] filter_funcs = self.get_search_filters(self.__simple_search__) # Only support AND'ing search terms together. Apply each simple search # filter to each search term and group them together. term_filters = [[func(term) for func in filter_funcs.values()] for term in search_terms] # Each item in term_filters is a list of filters applied to one of # the search terms contained in search_string. We need at least one # simple filter to match for each term. We need all search terms to # have at least simple filter match. return and_(*[or_(*filters) for filters in term_filters])
[docs] def search(self, search_string=None, search_dict=None, **search_options): """Perform combination of simple/advanced searching with optional limit/offset support. """ search_options.setdefault('limit', None) search_options.setdefault('offset', None) search_options.setdefault('order_by', self.__order_by__) query = self # Apply search filtering and pagination to Model's primary keys so we # can use the query as a subquery. In order to properly handle # pagination, we can use a subquery so that the outer level joins # won't cause records to be excluded when they include *-to-many # relationships. For example, if we were returning a query of user + # user keywords (one-to-many), then for something like the first 25 # users, we may actually have more than that many records since we're # joining on many records from the user keywords table. original = (self.lazyload('*') .load_only(*self.Model.primary_attrs()) .distinct()) # Use the original query so that we preserve joins and where # statements. model_query = original if self.whereclause is not None: # If our base query contains a whereclause, then we need to # compelete the "transfer" of the base query's where statements to # model_query by wiping out the base query's criterion. i.e. We # only want to maintain selects and froms in the base query and # keep wheres in the model_query. # Call a generative query method that won't modify its state. This # is basically a no-op used to copy the query object and modify it # below. NOTE: There may be a better way to do this. query = query.filter() # Remove existing filters since they were transferred to the # model_query. This may seem kind of hacky but I don't know of a # better way to nullify the query object's where clause. query._criterion = None if search_string is not None: model_query = model_query.filter( self.simple_filter(search_string.split())) if search_dict is not None: model_query = model_query.filter( self.advanced_filter(search_dict)) if search_options['order_by'] is not None: if not isinstance(search_options['order_by'], (list, tuple)): search_options['order_by'] = [search_options['order_by']] model_query = model_query.order_by(*search_options['order_by']) if search_options['limit'] is not None: model_query = model_query.limit(search_options['limit']) if search_options['offset'] is not None: model_query = model_query.offset(search_options['offset']) if model_query != original: subquery = model_query.subquery() query = query.join( subquery, join_subquery_on_columns(subquery, self.Model.primary_keys())) return query
[docs]class QueryProperty(object): """Query property accessor which gives a model access to query capabilities via :attr:`alchy.model.ModelBase.query` which is equivalent to ``session.query(Model)``. """ def __init__(self, session): self.session = session def __get__(self, model, Model): mapper = orm.class_mapper(Model) if mapper: if not getattr(Model, 'query_class', None): Model.query_class = QueryModel query_property = Model.query_class(mapper, session=self.session()) return query_property
## # Pagination class and usage adapated from Flask-SQLAlchemy: # https://github.com/mitsuhiko/flask-sqlalchemy ##
[docs]class LoadOption(object): """Chained load option to apply to a load strategy when calling :class:`Query` load methods. Example usage: :: qry = (db.session.query(Product) .join_eager('category', options=[LoadOption('noload', 'images')])) This would result in the ``noload`` option being chained to the eager option for ``Product.category`` and is equilvalent to: :: qry = (db.session.query(Product) .join('category') .options(contains_eager('category').noload('images'))) """ def __init__(self, strategy, *args, **kargs): self.strategy = strategy self.args = args self.kargs = kargs
def get_load_options(*columns): """Helper method that attempts to extract a sqlalchemy object from `columns[0]` and return remaining columns to apply to a query load method. """ model_inspect = inspect(columns[0], raiseerr=False) # return an obj which has loading API if model_inspect and model_inspect.is_mapper: obj = Load(columns[0]) columns = columns[1:] elif isinstance(columns[0], Load): obj = columns[0] columns = columns[1:] else: obj = orm return (obj, columns) def apply_load_options(load, options): """Apply load `options` to base `load` object. """ for load_option in options: load = getattr(load, load_option.strategy)(*load_option.args, **load_option.kargs) return load def base_columns_from_subquery(subquery): """Return non-aliased, base columns from subquery.""" # base_columns is a set so we need to cast to list. return [(column, list(column.base_columns)) for column in subquery.c.values()] def join_subquery_on_columns(subquery, columns): """Return join-on condition which maps subquery's columns to columns.""" subquery_base_columns = base_columns_from_subquery(subquery) join_on = [] for subquery_column, base_columns in subquery_base_columns: # Don't support joining to subquery column with more than 1 base # column. if len(base_columns) == 1 and base_columns[0] in columns: join_on.append(subquery_column == base_columns[0]) if join_on: return and_(*join_on) else: # pragma: no cover return None