Подтвердить что ты не робот

Как использовать Flask-SQLAlchemy в задаче Сельдерея

Недавно я переключился на Celery 3.0. До этого я использовал Flask-Celery, чтобы интегрировать Celery with Flask. Хотя у него было много проблем, таких как скрытие некоторых мощных функциональных возможностей сельдерея, но это позволило мне использовать полный контекст приложения Flask и особенно Flask-SQLAlchemy.

В моих фоновых задачах я обрабатываю данные и ORM SQLAlchemy для хранения данных. Составитель Flask-Celery отказался от поддержки плагина. Плагин собирал экземпляр Flask в задаче, поэтому я мог иметь полный доступ к SQLAlchemy.

Я пытаюсь воспроизвести это поведение в файле tasks.py, но без успеха. Есть ли у вас какие-либо намеки на то, как достичь этого?

4b9b3361

Ответ 1

Обновление. С тех пор мы начали использовать лучший способ справиться с отладкой приложения и настроить его для каждой задачи на основе шаблона, описанного в более поздняя документация фляг.

extensions.py

import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

class FlaskCelery(Celery):

    def __init__(self, *args, **kwargs):

        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)


celery = FlaskCelery()
db = SQLAlchemy()

app.py

from flask import Flask
from extensions import celery, db

def create_app():
    app = Flask()

    #configure/initialize all your extensions
    db.init_app(app)
    celery.init_app(app)

    return app

После того, как вы настроили свое приложение таким образом, вы можете запускать и использовать сельдерей без необходимости явно запускать его из контекста приложения, так как все ваши задачи будут автоматически запускаться в контексте приложения, если это необходимо, и вы не не нужно явно беспокоиться о разрыве после задачи, что является важной проблемой для управления (см. другие ответы ниже).

Старый ответ ниже, все еще работает, но не так, как чистое решение

Я предпочитаю запускать весь сельдерей в контексте приложения, создавая отдельный файл, который вызывает celery.start() с контекстом приложения. Это означает, что ваш файл задач не должен быть усеян настройкой контекста и разрывами. Он также хорошо подходит к шаблону "приложение factory".

extensions.py

from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()

tasks.py

from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun

@celery.task
def do_some_stuff():
    current_app.logger.info("I have the application context")
    #you can now use the db object from extensions

@task_postrun.connect
def close_session(*args, **kwargs):
    # Flask SQLAlchemy will automatically create new sessions for you from 
    # a scoped session factory, given that we are maintaining the same app
    # context, this ensures tasks have a fresh session (e.g. session errors 
    # won't propagate across tasks)
    db.session.remove()

app.py

from extensions import celery, db

def create_app():
    app = Flask()

    #configure/initialize all your extensions
    db.init_app(app)
    celery.config_from_object(app.config)

    return app

RunCelery.py

from app import create_app
from extensions import celery

app = create_app()

if __name__ == '__main__':
    with app.app_context():
        celery.start()

Ответ 2

В файле tasks.py выполните следующие действия:

from main import create_app
app = create_app()

celery = Celery(__name__)
celery.add_defaults(lambda: app.config)

@celery.task
def create_facet(project_id, **kwargs):
    with app.test_request_context():
       # your code

Ответ 3

Я использовал ответ Павла Гиббса с двумя отличиями. Вместо task_postrun я использовал worker_process_init. И вместо .remove() я использовал db.session.expire_all().

Я не уверен на 100%, но из того, что я понимаю, как это работает, когда Celery создает рабочий процесс, все унаследованные/разделяемые сеансы db будут истекли, а SQLAlchemy создаст новые сеансы по требованию уникальные к этому рабочему процессу.

До сих пор, похоже, я исправил свою проблему. С решением Пола, когда один рабочий закончил и удалил сеанс, другой рабочий, использующий тот же сеанс, все еще выполнял свой запрос, поэтому db.session.remove() закрыл соединение во время его использования, предоставив мне "Потерянное соединение с MySQL сервер во время запроса".

Спасибо Павлу за то, что он меня направил в правильном направлении!

Никогда не думал, что это не сработало. Я закончил с аргументом в моем приложении Flask factory, чтобы не запускать db.init_app (приложение), если его назвал Селььель. Вместо этого рабочие назовут его после того, как сельдерей разбудит их. Теперь я вижу несколько соединений в моем списке процессов MySQL.

from extensions import db
from celery.signals import worker_process_init
from flask import current_app

@worker_process_init.connect
def celery_worker_init_db(**_):
    db.init_app(current_app)

Ответ 4

from flask import Flask
from werkzeug.utils import import_string
from celery.signals import worker_process_init, celeryd_init
from flask_celery import Celery
from src.app import config_from_env, create_app

celery = Celery()

def get_celery_conf():
    config = import_string('src.settings')
    config = {k: getattr(config, k) for k in dir(config) if k.isupper()}
    config['BROKER_URL'] = config['CELERY_BROKER_URL']
    return config

@celeryd_init.connect
def init_celeryd(conf=None, **kwargs):
    conf.update(get_celery_conf())

@worker_process_init.connect
def init_celery_flask_app(**kwargs):
    app = create_app()
    app.app_context().push()
  • Обновить конфигурацию сельдерея в celeryd init
  • Используйте флеш-приложение factory для инициализации всех расширений фляж, включая расширение SQLAlchemy.

Таким образом, мы можем поддерживать подключение к базе данных для каждого сотрудника.

Если вы хотите запустить задачу под флагом, вы можете подклассом Task.__call__:

class SmartTask(Task):

    abstract = True

    def __call__(self, *_args, **_kwargs):
        with self.app.flask_app.app_context():
            with self.app.flask_app.test_request_context():
                result = super(SmartTask, self).__call__(*_args, **_kwargs)
            return result

class SmartCelery(Celery):

    def init_app(self, app):
        super(SmartCelery, self).init_app(app)
        self.Task = SmartTask