Подтвердить что ты не робот

Сигналов или триггеров в SQLAlchemy

У SQLAlchemy есть что-то похожее на концепцию сигнала Django? В принципе, я хотел бы запустить несколько функций, когда я предварительно сохраняю или сохраняю некоторые объекты объектов. Спасибо.

Изменить: Я просто хочу эквивалент django-сигналов в SQLAlchemy.

4b9b3361

Ответ 2

Вы не уточнили, интегрируете ли вы SQLAlchemy и Django, или просто хотите эквивалент django-сигналов в SQLAlchemy.

Если вы хотите получить эквивалент сигналов Django, таких как post_save, pre_save, pre_delete и т.д., я бы назвал вас страницей,

sqlalchemy.orm.interfaces.MapperExtension

Ответ 3

Возможно, вы захотите рассмотреть sqlalchemy.orm.SessionExtension, а также

Вот код, который я собрал вместе, чтобы установить идентификатор владельца в экземпляре и установить update_date, который выполнит работу в приложении pylons. класс OrmExt - это место, где происходит вся магия. И init_model - это то место, где вы подключаете его.

import logging
import sqlalchemy as sa
from sqlalchemy import orm

from pylons import session

import datetime

log = logging.getLogger(__name__)

class ORMSecurityException(Exception):
    '''
    thrown for security violations in orm layer
    '''
    pass

def _get_current_user():
    log.debug('getting current user from session...')
    log.debug(session)
    return session['user'] 

def _is_admin(user):
    return False  


def set_update_date(instance):

    if hasattr(instance,'update_date'):
    instance.update_date = datetime.datetime.now()

def set_owner(instance):
    '''
    if owner_id, run it through the rules
    '''
    log.info('set_owner')
    if hasattr(instance, 'owner_id'):
    log.info('instance.owner_id=%s' % instance.owner_id)
    u = _get_current_user()
    log.debug('user: %s' % u.email)
    if not u:
        #anonymous users can't save owned objects
        raise ORMSecurityException()
    if instance.owner_id==None:
        #must be new object thus, owned by current user
        log.info('setting owner on object %s for user: %s' % (instance.__class__.__name__,u.email))
        instance.owner_id = u.id
    elif instance.owner_id!=u.id and not _is_admin(u):
        #if owner_id does not match user_id and user is not admin VIOLATION
        raise ORMSecurityException()
    else:
        log.info('object is already owned by this user')
        return #good to go
else:
    log.info('%s is not an owned object' % instance.__class__.__name__)
    return

def instance_policy(instance):
    log.info('setting owner for %s' % instance.__class__.__name__)
    set_owner(instance)
    log.info('setting update_date for %s' % instance.__class__.__name__)
    set_update_date(instance)


class ORMExt(orm.SessionExtension):
    '''
    attempt at managing ownership logic on objects
    '''
    def __init__(self,policy):
        self._policy = policy

    def before_flush(self,sqlsess,flush_context,instances):
        '''
        check all instances for owner_id==user.id
        '''
        try:
            for instance in sqlsess.deleted:
                try:
                    log.info('running policy for deleted %s' % instance.__class__.__name__)
                    self._policy(instance)
                except Exception,ex:
                    log.error(ex)
                    raise ex

            for instance in sqlsess.new:
                try:
                    log.info('running policy for new %s' % instance.__class__.__name__)
                    self._policy(instance)
                except Exception,ex:
                    log.error(ex)
                    raise ex

            for instance in sqlsess.dirty:
                try:
                    if sqlsess.is_modified(instance,include_collections=False,passive=True):
                        log.info('running policy for updated %s' % instance.__class__.__name__)
                        self._policy(instance)
                except Exception, ex:
                    log.error(ex)
                    raise ex

        except Exception,ex:
            sqlsess.expunge_all()
            raise ex

def init_model(engine):
    """Call me before using any of the tables or classes in the model"""
    sm = orm.sessionmaker(autoflush=True, autocommit=True, bind=engine,extension=ORMExt(instance_policy))
    meta.engine = engine
    meta.Session = orm.scoped_session(sm)

Ответ 4

Здесь я беру эту проблему, она использует Louie для отправки сигналов:

dispatch.py

"""
Signals dispatching for SQLAlchemy mappers.
"""

import louie
from sqlalchemy.orm.interfaces import MapperExtension
import signals


class LouieDispatcherExtension(MapperExtension):
    """
    Dispatch signals using louie on insert, update and delete actions.
    """

    def after_insert(self, mapper, connection, instance):
        louie.send(signals.after_insert, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_insert(mapper,
                connection, instance)

    def after_delete(self, mapper, connection, instance):
        louie.send(signals.after_delete, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_delete(mapper,
                connection, instance)

    def after_update(self, mapper, connection, instance):
        louie.send(signals.after_update, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_update(mapper,
                connection, instance)

    def before_delete(self, mapper, connection, instance):
        louie.send(signals.before_delete, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_delete(mapper,
                connection, instance)

    def before_insert(self, mapper, connection, instance):
        louie.send(signals.before_insert, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_insert(mapper,
                connection, instance)

    def before_update(self, mapper, connection, instance):
        louie.send(signals.before_update, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_update(mapper,
                connection, instance)

signals.py

from louie import Signal


class after_delete(Signal): pass
class after_insert(Signal): pass
class after_update(Signal): pass
class before_delete(Signal): pass
class before_insert(Signal): pass
class before_update(Signal): pass

Использование образца:

class MyModel(DeclarativeBase):

    __mapper_args__ = {"extension": LouieDispatcherExtension()}

    ID = Column(Integer, primary_key=True)
    name = Column(String(255))

def on_insert(instance):
    print "inserted %s" % instance

louie.connect(on_insert, signals.after_insert, MyModel)

Ответ 5

Вы можете использовать внутренний класс MapperExtension:

class YourModel(db.Model):

    class BaseExtension(MapperExtension):

        def before_insert(self, mapper, connection, instance):
            # do something here

        def before_update(self, mapper, connection, instance):
            # do something here

    __mapper_args__ = { 'extension': BaseExtension() }

    # ....