Подтвердить что ты не робот

Временная очередь, сделанная в сельдерее

Я использую сельдерей с RabbitMQ. В последнее время я заметил, что создается большое количество временных очередей.

Итак, я экспериментировал и обнаружил, что когда задача выходит из строя (это задача вызывает исключение), тогда формируется временная очередь со случайным именем (например, c76861943b0a4f3aaa6a99a6db06952c), и очередь остается.

Некоторые свойства временной очереди, найденные в rabbitmqadmin, следующие:

auto_delete: True потребителей: 0 долговечный: False сообщений: 1 messages_ready: 1

И одна такая временная очередь выполняется каждый раз, когда задача терпит неудачу (то есть вызывает исключение). Как избежать этой ситуации? Потому что в моей производственной среде формируется большое количество таких очередей.

4b9b3361

Ответ 1

Ну, Филипп прямо здесь. Ниже приводится описание того, как я его решил. Это настройка в celeryconfig.py.

Я все еще использую CELERY_BACKEND = "amqp", как сказал Филипп. Но в дополнение к этому, я теперь использую CELERY_IGNORE_RESULT = True. Эта конфигурация гарантирует, что дополнительные очереди не будут сформированы для каждой задачи.

Я уже использовал эту конфигурацию, но все же, когда задача не удалась, была сформирована дополнительная очередь. Затем я заметил, что я использовал другую конфигурацию, которую нужно было удалить, которая была CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True. Что это сделало, что он не сохранил результаты для всех задач, а сделал только для ошибок (неудачных задач) и, следовательно, одной дополнительной очереди для задачи, которая не удалась.

Ответ 2

Похоже, вы используете amqp в качестве базы данных результатов. Из docs здесь вы найдете проблемы с использованием этой конкретной установки:

  • Каждая новая задача создает новую очередь на сервере, с тысячами задачи брокера могут быть перегружены очередями, и это повлияет на производительность в отрицательных направлениях. Если вы используете RabbitMQ, то каждый очередь будет отдельным процессом Erlang, поэтому, если вы планируете держите много результатов одновременно, вам может потребоваться увеличить Erlang
    предел процесса и максимальное количество файловых дескрипторов вашей ОС
    позволяет
  • Старые результаты не будут автоматически очищаться, поэтому вы должны сделать обязательно использовать результаты, иначе количество очередей будет в конечном итоге выходят из-под контроля. Если вы используете RabbitMQ 2.1.1 или выше вы можете использовать аргумент x-expires для очередей, которые истекают после очередного периода времени после того, как они неиспользованными. Истечение очереди может быть установлено (в секундах) на Параметр CELERY_AMQP_TASK_RESULT_EXPIRES (не включен по умолчанию).

Из того, что я читал в changelog, это больше не бэкэнда по умолчанию в версиях >= 2.3.0, потому что пользователи получали бит в таким образом. Я предлагаю изменить бэкэнд результатов, если это не функциональность, в которой вы нуждаетесь.

Ответ 3

CELERY_TASK_RESULT_EXPIRES задает время для жизни временных очередей. Значение по умолчанию - 1 день. Вы можете изменить это значение.

Ответ 4

Причина, по которой это происходит, заключается в том, что дистанционное управление работников сельдерей включено (по умолчанию оно включено).

Вы можете отключить его, установив для параметра CELERY_ENABLE_REMOTE_CONTROL значение False Однако обратите внимание, что вы потеряете возможность делать такие вещи, как add_consumer, cancel_consumer и т.д., Используя celery command

Ответ 5

amqp backend создает новую очередь для каждой задачи. Если вы хотите этого избежать, вы можете использовать бэкэнд rpc, который сохраняет результаты в одной очереди.

В вашей конфигурации установите

CELERY_RESULT_BACKEND = 'rpc'
CELERY_RESULT_PERSISTENT = True

Подробнее о это в документах по сельдерей.