Подтвердить что ты не робот

Почему у CELERY_ROUTES есть "очередь" и "routing_key"?

Мое понимание AMQP заключается в том, что сообщения имеют только следующие компоненты:

  • Тело сообщения
  • Клавиша маршрутизации
  • Обмен

Очереди привязаны к биржам. Сообщения не могут знать о очередях. Они просто отправляют на обмен, а затем на основе типа обмена и ключа маршрутизации сообщения направляются в одну или несколько очередей.

В сельдерей рекомендуемый способ маршрутизации задается с помощью настройки CELERY_ROUTES. Из документов CELERY_ROUTES...

Список маршрутизаторов или один маршрутизатор, используемый для маршрутизации задач в очереди. http://celery.readthedocs.org/en/latest/configuration.html#message-routing

И он включает пример...

Чтобы перенаправить задачу в очередь feed_tasks, вы можете добавить запись в CELERY_ROUTES:

CELERY_ROUTES = {
    'feeds.tasks.import_feed': {
        'queue': 'feed_tasks',
        'routing_key': 'feed.import',
    },
}

Но подождите минуту. Согласно AMQP, сообщения поступают только с ключом маршрутизации! Какая черта - это "очередь" там?

Кроме того, существует такое понятие очереди по умолчанию. Если вы вызываете задачу, которая не попадает на CELERY_ROUTES, она возвращается к CELERY_DEFAULT_QUEUE. Но опять же - в AMQP сообщения не знают о очередях. Разве это не должно быть ключом маршрутизации по умолчанию?

4b9b3361

Ответ 1

Верно ли, что на сельдерее есть немного путаницы, когда вы переходите в Очереди, вам нужно иметь в виду, что параметр очереди относится к объекту Queue Kombu Queue, а не к очереди AMQP, вы можете понять это прочитав этот извлечение из документов. Конечно, тот факт, что сельдерей создает очередь и обменивается с тем же именем, является источником путаницы в использовании параметра очереди. Всегда в документах вы можете прочитать этот абзац:

Если у вас есть другая очередь, но на другой обмен, который вы хотите добавить, просто укажите пользовательский тип обмена и обмена:

CELERY_QUEUES = (
    Queue('feed_tasks',    routing_key='feed.#'),
    Queue('regular_tasks', routing_key='task.#'),
    Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
                       routing_key='image.compress'),
)

Таким образом, вы можете связать две разные очереди на одном и том же обмене. После маршрутизации задачи, используя только обмен и ключ, вы можете использовать класс Routers

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'myapp.tasks.compress_video':
            return {'exchange': 'video',
                    'exchange_type': 'topic',
                    'routing_key': 'video.compress'}
        return None

Подробнее здесь http://celery.readthedocs.org/en/latest/userguide/routing.html#routers