Source code for alchy.utils

"""Generic utility functions used in package.

..
    >>> from alchy.utils import *
"""

import re
import warnings
from collections import Iterable

from sqlalchemy import Column
from sqlalchemy.exc import SAWarning
from sqlalchemy.ext.declarative import AbstractConcreteBase, declared_attr

from ._compat import string_types, itervalues, iteritems, classmethod_func


__all__ = [
    'is_sequence',
    'camelcase_to_underscore',
    'iterflatten',
    'flatten'
]


[docs]def is_sequence(obj): """Test if `obj` is an iterable but not ``dict`` or ``str``. Mainly used to determine if `obj` can be treated like a ``list`` for iteration purposes. """ return (isinstance(obj, Iterable) and not isinstance(obj, string_types) and not isinstance(obj, dict))
[docs]def camelcase_to_underscore(string): """Convert string from ``CamelCase`` to ``under_score``.""" return re.sub('((?<=[a-z0-9])[A-Z]|(?<!_)(?!^)[A-Z](?=[a-z]))', r'_\1', string).lower()
[docs]def iterflatten(items): """Return iterator which flattens list/tuple of lists/tuples:: >>> to_flatten = [1, [2,3], [4, [5, [6]], 7], 8] >>> assert list(iterflatten(to_flatten)) == [1,2,3,4,5,6,7,8] """ for item in items: if isinstance(item, (list, tuple)): for itm in iterflatten(item): yield itm else: yield item
[docs]def flatten(items): """Return flattened list of a list/tuple of lists/tuples:: >>> assert flatten([1, [2,3], [4, [5, [6]], 7], 8]) == [1,2,3,4,5,6,7,8] """ return list(iterflatten(items))
def iterunique(items): """Return iterator to find unique list while preserving the order.""" seen = [] for item in items: if item not in seen: seen.append(item) yield item def unique(items): """Return unique list while preserving the order. >>> assert unique([1, 2, 3, 1, 2, 3, 4]) == [1, 2, 3, 4] """ return list(iterunique(items)) def mapper_class(relation): """Return mapper class given an ORM relation attribute.""" return relation.property.mapper.class_ def get_mapper_class(model, field): """Return mapper class given ORM model and field string.""" return mapper_class(getattr(model, field)) def get_concrete_value(obj, cls, check_classmethod=False, check_callable=False): """Return a 'concrete' form of obj. If obj is a declared_attr, it is evaluated on cls. If obj is a classmethod and check_classmethod is True, it is evaluated on cls. If obj is callable and check_callable is True, it is evaluated. """ if isinstance(obj, declared_attr): return obj.fget(cls) elif check_classmethod and isinstance(obj, classmethod): return classmethod_func(obj)(cls) elif check_callable and callable(obj): return obj() else: return obj def merge_declarative_args(cls, global_config_key, local_config_key): """Merge declarative args for class `cls` identified by `global_config_key` and `local_config_key` into a consolidated (tuple, dict). """ configs = [base.__dict__.get(global_config_key) for base in reversed(cls.mro())] configs.append(cls.__dict__.get(local_config_key)) args = [] kargs = {} for obj in configs: obj = get_concrete_value(obj, cls, check_classmethod=True, check_callable=True) if not obj: continue if isinstance(obj, dict): kargs.update(obj) elif isinstance(obj[-1], dict): # Configuration object has a dict at the end so we assume the # passed in format was [item0, item1, ..., dict0]. args += obj[:-1] kargs.update(obj[-1]) else: args += obj # Make args unique in case base classes duplicated items. args = unique(args) return (args, kargs) def should_set_tablename(cls): """Check what values are set by a class and its bases to determine if a tablename should be automatically generated. The class and its bases are checked in order of precedence: the class itself then each base in the order they were given at class definition. Abstract classes do not generate a tablename, although they may have set or inherited a tablename elsewhere. If a class defines a tablename or table, a new one will not be generated. Otherwise, if the class defines a primary key, a new name will be generated. This supports: * Joined table inheritance without explicitly naming sub-models. * Single table inheritance. * Concrete table inheritance * Inheriting from mixins or abstract models. :param bases: base classes of new class :param dct: new class dict :return: True if tablename should be set """ dct = cls.__dict__ if '__tablename__' in dct or '__table__' in dct or '__abstract__' in dct: return False if AbstractConcreteBase in cls.__bases__: return False def is_primary_key_column(obj): obj = get_concrete_value(obj, cls) return isinstance(obj, Column) and obj.primary_key with warnings.catch_warnings(): warnings.filterwarnings("ignore", message='Unmanaged access of ' 'declarative attribute .*', category=SAWarning) for name, value in iteritems(dct): if is_primary_key_column(value): return True if '__mapper_args__' in dct: mapper_args = get_concrete_value(dct.get('__mapper_args__', {}), cls, check_classmethod=True, check_callable=True) is_concrete = mapper_args.get('concrete', False) else: mapper_args = get_concrete_value(dct.get('__global_mapper_args__', {}), cls, check_classmethod=True, check_callable=True) is_concrete = mapper_args.get('concrete', False) mapper_args = get_concrete_value(dct.get('__local_mapper_args__', {}), cls, check_classmethod=True, check_callable=True) is_concrete = mapper_args.get('concrete', is_concrete) names_to_ignore = set(dct.keys()) names_to_ignore.add('query') for base in cls.__bases__: if (not is_concrete) and (hasattr(base, '__tablename__') or hasattr(base, '__table__')): return False for k in base.mro(): for name, value in iteritems(k.__dict__): if not (name in names_to_ignore or (name.startswith('__') and name.endswith('__'))): if is_primary_key_column(value): return True else: names_to_ignore.add(name) return False