Подтвердить что ты не робот

Как отправить периодические задания в определенную очередь в Сельдерей

По умолчанию сельдерей отправляет все задачи в очередь "сельдерей", но вы можете изменить это поведение, добавив дополнительный параметр:

@task(queue='celery_periodic')
def recalc_last_hour():
    log.debug('sending new task')
    recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example

Настройки планировщика:

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15  # every 15 sec for test
    },
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"

Запустить сотрудника:

python manage.py celery worker -c 1 -Q celery_periodic -B -E

Эта схема работает не так, как ожидалось: эти рабочие отправляют периодические задачи в очередь "сельдерей", а не "celery_periodic". Как я могу это исправить?

P.S. сельдерея == 3.0.16

4b9b3361

Ответ 1

Я нашел решение этой проблемы:

1) Прежде всего я изменил способ настройки периодических задач. Я использовал @periodic_task декоратор:

@periodic_task(run_every=crontab(minute='5'),
               queue='celery_periodic',
               options={'queue': 'celery_periodic'})
def recalc_last_hour():
    dt = datetime.utcnow()
    prev_hour = datetime(dt.year, dt.month, dt.day, dt.hour) \
                - timedelta(hours=1)
    log.debug('Generating task for hour %s', str(prev_hour))
    recalc_hour.delay(prev_hour)

2) Я написал celery_periodic дважды в параметрах @periodic_task:

  • queue = 'celery_periodic' используется, когда вы вызываете задачу из кода (.delay или .apply_async)

  • options = {'queue': 'celery_periodic'} опция используется, когда бит celery вызывает ее.

Я уверен, то же самое возможно, если вы будете настраивать периодические задачи с помощью переменной CELERYBEAT_SCHEDULE.

UPD. Это решение подходит как для баз данных, так и для файлов на основе CELERYBEAT_SCHEDULER.

Ответ 2

Периодичность отправляется в очереди с помощью celerybeat. Вы можете делать все, что мы делаем с помощью Celery api. Вот список конфигураций поставляется с celerybeat.

http://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields

В вашем случае

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15  # every 15 sec for test,
        'options': {'queue' : 'celery_periodic'} ##options are mapped to apply_async options
    },
}

Ответ 3

И если вы используете планировщик базы данных djcelery, вы можете указать очередь в поле "Параметры выполнения → очередь"