Подтвердить что ты не робот

Как я могу настроить Celery для вызова пользовательской функции инициализации перед выполнением моих задач?

У меня есть проект Django, и я пытаюсь использовать Celery для отправки задач для фоновой обработки (http://ask.github.com/celery/introduction.html). Celery хорошо интегрируется с Django, и я смог отправить свои пользовательские задачи и получить результаты.

Единственная проблема заключается в том, что я не могу найти разумный способ выполнения пользовательской инициализации в процессе демона. Мне нужно вызвать дорогостоящую функцию, которая загружает много памяти, прежде чем я начну обрабатывать задачи, и я не могу позволить себе вызывать эту функцию каждый раз.

У кого-нибудь была эта проблема раньше? Любые идеи, как обойти это без изменения исходного кода Сельдерея?

Спасибо

4b9b3361

Ответ 1

Вы можете либо написать пользовательский загрузчик, либо использовать сигналы.

У загрузчиков есть метод on_task_init, который вызывается, когда задача должна быть выполнена, и on_worker_init, который вызывается основным процессом производства сельдерея и сельдерея.

Использование сигналов, вероятно, является самым простым, доступны следующие сигналы:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    Отправляется, когда задача должна быть выполнена рабочим (или локально если используется apply/или если CELERY_ALWAYS_EAGER установлено).

  • task_postrun(task_id, task, args, kwargs, retval) Отправляется после выполнения задачи в тех же условиях, что и выше.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Вызывается при выполнении задачи (не подходит для длительных операций)

Дополнительные сигналы, доступные в 0.9.x(текущая ведущая ветвь на github):

  • worker_init()

    Вызывается при запуске celeryd (перед инициализацией задачи, поэтому, если на система, поддерживающая fork, любые изменения памяти будут скопированы в дочерний рабочие процессы).

  • worker_ready()

    Вызывается, когда celeryd может получать задания.

  • worker_shutdown()

    Вызывается при закрытии celeryd.

Здесь пример, который предварительно вычисляет что-то при первом запуске задачи:

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

Если вы хотите, чтобы функция выполнялась для всех задач, просто пропустите аргумент sender.