Подтвердить что ты не робот

Django/Celery несколько очередей на localhost - маршрутизация не работает

Я следовал за документами по сельдерею, чтобы определить 2 очереди на моей машине разработчика.

Мои настройки сельдерея:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'fs_feeds',
    },
}

я открыл два окна терминала в виртуальном состоянии моего проекта и выполнил следующие команды:

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker

что я получаю, так это то, что все задачи обрабатываются обеими очередями.

Моя цель - CELERY_ROUTES одну очередь для обработки только одной задачи, определенной в CELERY_ROUTES и очередь по умолчанию для обработки всех других задач.

Я также следовал за этим вопросом SO, rabbitmqctl list_queues возвращает rabbitmqctl list_queues celery 0, а выполнение rabbitmqctl list_bindings возвращает exchange celery queue celery [] rabbitmqctl list_bindings exchange celery queue celery []. Перезапуск сервера кролика ничего не изменил.

4b9b3361

Ответ 1

Хорошо, поэтому я понял это. Ниже приведены все мои настройки, настройки и способы использования сельдерея, для тех, кто может задаваться вопросом о том же, что и мой вопрос.

Настройки

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1

# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'feeds',
        'routing_key': 'long_tasks',
    },
}

Как запустить сельдерей?

terminal - вкладка 1:

celery -A proj worker -Q default -l debug -n default_worker

это запустит первый рабочий, который расходует задачи из очереди по умолчанию. ЗАМЕТКА! -n default_worker не является обязательным для первого работника, но является обязательным, если у вас есть какие-либо другие экземпляры сельдерея. Установка -n worker_name совпадает с [email protected]%h.

terminal - вкладка 2:

celery -A proj worker -Q feeds -l debug -n feeds_worker

это запустит второй рабочий, чтобы задачи потребителей из очереди фидов. Обратите внимание на -n feeds_worker, если вы работаете с -l debug (log level = debug), вы увидите, что оба рабочих синхронизируются между ними.

terminal - вкладка 3:

celery -A proj beat -l debug

это запустит бит, выполнив задания в соответствии с расписанием в CELERYBEAT_SCHEDULE. Мне не нужно было менять задачу или CELERYBEAT_SCHEDULE.

Например, так выглядит мой CELERYBEAT_SCHEDULE для задачи, которая должна идти в очередь фидов:

CELERYBEAT_SCHEDULE = {
    ...
    'update_feeds': {
        'task': 'arena.social.tasks.Update',
        'schedule': crontab(minute='*/6'),
    },
    ...
}

Как вы можете видеть, нет необходимости добавлять 'options': {'routing_key': 'long_tasks'} или указать, в какую очередь он должен идти. Кроме того, если вам было интересно, почему Update является верхним, то это связано с его настраиваемой задачей, которая определяется как подклассы celery.Task.

Ответ 2

Я следовал вышеописанной реализации, но это привело к созданию нескольких обменов в моей настройке RabbitMQ. Насколько я понимаю, для прослушивания всех очередей и маршрутизации всех задач на основе ключа маршрутизации должен быть только один обмен.