QuickstartΒΆ

Let’s see alchy in action. We’ll start with some model definitions.

from alchy import ModelBase, make_declarative_base
from sqlalchemy import orm, Column, types, ForeignKey

class Base(ModelBase):
    # extend/override ModelBase if necessary
    pass

Model = make_declarative_base(Base=Base)

class User(Model):
    __tablename__ = 'user'

    _id = Column(types.Integer(), primary_key=True)
    name = Column(types.String())
    email = Column(types.String())
    level = Column(types.Integer())

    items = orm.relationship('UserItem')

class UserItem(Model):
    # when no __tablename__ defined,
    # one is autogenerated using class name
    # like this:
    #__tablename__ = 'user_item'

    _id = Column(types.Integer(), primary_key=True)
    user_id = Column(types.Integer(), ForeignKey('user._id'))
    name = Column(types.String())

    user = orm.relationship('User')

Next, we need to interact with our database. For that we will use a alchy.manager.Manager.

from alchy import Manager

# Config can be either (1) dict, (2) class, or (3) module.
config = {
    'SQLALCHEMY_DATABASE_URI': 'sqlite://'
}

# Be sure to pass in our declarative base defined previously.
# This is needed so that Model.metadata operations like
# create_all(), drop_all(), and reflect() work.
db = Manager(config=config, Model=Model)

Create our database tables.

db.create_all()

Now, create some records.

# initialize using keyword args
user1 = User(name='Fred', email='fred@example.com')
# print('user1:', user1)

# ...or initialize using a dict
user2 = User({'name': 'Barney'})
# print('user2:', user2)

# update using either method as well
user2.update(email='barney@example.org')
user2.update({'email': 'barney@example.com'})
# print('user2 updated:', user2)

Add them to the database.

# there are several options for adding records

# add and commit in one step using positional args
db.add_commit(user1, user2)

# ...or add/commit using a list
users = [user1, user2]
db.add_commit(users)

# ...or separate add and commit calls
db.add(user1, user2)
db.commit()

# ...or with a list
db.add(users)
db.commit()

# ...or separate adds and commit
db.add(user1)
db.add(user2)
db.commit()

Fetch model and operate.

# create user
db.add_commit(User(name='Wilma', email='wilma@example.com'))

# fetch from database
user = User.get(user1._id)
# print('user:', user)

# convert to dict
user_dict = user.to_dict()
# print('user dict:', user_dict)

# ...or just pass object directly to dict()
user_dict = dict(user)

# make some changes
user.update(level=5)

# and refresh
user.refresh()

# or flush
user.flush()

# access the session that loaded the model instance
assert user.session() == db.object_session(user)

# delete user
user.delete()
db.commit()

# ...or via db
db.delete(user)
db.commit()

# ...or all-in-one step
db.delete_commit(user)

Query records from the database.

# add some more users
db.add_commit(
    User(items=[UserItem()]),
    User(items=[UserItem()]),
    User(items=[UserItem()]),
    User(items=[UserItem()]),
    User(items=[UserItem()])
)

# there are several syntax options for querying records

# using db.session directly
# print('all users:', db.session.query(User).all())

# ...or using db directly (i.e. db.session proxy)
assert db.query(User).all() == db.session.query(User).all()

# ...or via query property on model class
assert User.query.all() == db.session.query(User).all()

Use features from the enhanced query class.

q = User.query.join(UserItem)

# entities
assert q.entities == [User]
assert q.join_entities == [UserItem]
assert q.all_entities == [User, UserItem]

# paging
assert str(q.page(2, per_page=2)) == str(q.limit(2).offset((2-1) * 2))

# pagination
page2 = q.paginate(2, per_page=2)
assert str(page2.query) == str(q)
assert page2.page == 2
assert page2.per_page == 2
assert page2.total == q.count()
assert page2.items == q.limit(2).offset((2-1) * 2).all()
assert page2.prev_num == 1
assert page2.has_prev == True
assert page2.next_num == 3
assert page2.has_next == True
page_1 = page2.prev()
page_3 = page2.next()

# searching

# ...extend class definitions to support advanced and simple searching
User.__advanced_search__ = User.__simple_search__ = {
    'user_email': lambda value: User.email.like('%{0}%'.format(value)),
    'user_name': lambda value: User.name.like('%{0}%'.format(value))
}

UserItem.__advanced_search__ = {
    'item_name': lambda value: UserItem.name.like('%{0}%'.format(value))
}

search = User.query.search('example.com', {'user_name': 'wilma'})
# print('search:', str(search))
assert search.count() > 0

# entity loading
User.query.join_eager(User.items)
User.query.joinedload(User.items)
User.query.lazyload(User.items)
User.query.immediateload(User.items)
User.query.noload(User.items)
User.query.subqueryload(User.items)

# column loading
User.query.load_only('_id', 'name')
User.query.defer('email')
User.query.undefer('email') # if User.email undeferred in class definition
User.query.undefer_group('group1', 'group2') # if under groups defined in class

# utilities
User.query.map(lambda user: user.level)
User.query.pluck('level')
User.query.index_by('email')
User.query.chain().value()
User.query.reduce(
    lambda result, user: result + 1 if user.level > 5 else result,
    initial=0
)

For more details regarding the chaining API (i.e. Query.chain()), see the pydash documentation.

Utilize ORM events.

from alchy import events

class User(Model):
    __table_args__ = {
        # this is needed since we're replacing the ``User`` class defined above
        'extend_existing': True
    }

    _id = Column(types.Integer(), primary_key=True)
    name = Column(types.String())
    email = Column(types.String())
    level = Column(types.Integer())

    @events.before_insert_update()
    def validate(self, *args, **kargs):
        '''Validate model instance'''
        # do validation
        return

    @events.on_set('email')
    def on_set_email(self, value, oldvalue, initator):
        if self.query.filter(User.email==value, User._id!=self._id).count() > 0:
            raise ValueError('Email already exists in database')

user = User(email='one@example.com')
db.add_commit(user)

try:
    User(email=user.email)
except ValueError as ex:
    pass

Finally, clean up after ourselves.

db.drop_all()

See also

For further details consult API Reference.