Подтвердить что ты не робот

Сельдерей. Может ли сообщение в RabbitMQ потреблять два или более рабочих одновременно?

Возможно, я глупо задаю вопрос, но перед тем, как продолжить работу, мне нужно окунуться в основные понятия.

Я обрабатываю несколько тысяч RSS-каналов, используя несколько рабочих узлов сельдерея и RabbitMQ node в качестве брокера. URL-адрес каждого фида записывается как сообщение в очереди. Работник просто считывает URL-адрес из очереди и начинает его обрабатывать. Я должен обеспечить, чтобы один RSS-канал не обрабатывался двумя рабочими одновременно.

Статья Обеспечение выполнения задачи выполняется только один за раз предлагает решение на основе Memcahced для блокировки фида при его обработке.

Но я пытаюсь понять, почему мне нужно использовать Memcached (или что-то еще), чтобы гарантировать, что сообщение в очереди RabbitMQ не будет потребляться несколькими рабочими одновременно. Есть ли какое-то изменение конфигурации в RabbitMQ (или Celery), которое я могу сделать для достижения этой цели?

4b9b3361

Ответ 1

Как отмечают другие, вы смешиваете яблоки и апельсины.

Быть задачей сельдерея и сообщением MQ.

Вы можете гарантировать, что сообщение будет обрабатываться только одним рабочим одновременно.

например.

@task(...)
def my_task(

my_task.apply(1)

.apply публикует сообщение используемому вами брокеру сообщений (кролик, redis...). Затем сообщение будет перенаправлено в очередь и будет потреблено одним работником. вам не нужна блокировка для этого, у вас есть это бесплатно:)

Пример в кулинарной книге сельдерея показывает, как предотвратить одновременное выполнение двух таких сообщений (my_task.apply(1)), это то, что вам нужно обеспечить в самой задаче.

Вам нужно что-то, что вы можете получить от всех работников, конечно (memcached, redis...), поскольку они могут работать на разных машинах.

Ответ 2

Одно сообщение MQ, конечно, не будет видно нескольким потребителям в нормальной рабочей настройке. Вам нужно будет выполнить некоторую работу по делам, связанным с сбоями/сбоями работников, чтением данных об ошибках auto-acks и сообщений, но основной случай звучит.

Я не вижу синхронизированную очередь (read: MQ) в связанной статье, поэтому (насколько я могу судить) они используют механизм блокировки (read: memcache) для синхронизации, как альтернатива. И я могу придумать несколько проблем, которые не были бы в правильной настройке MQ.

Ответ 3

Упомянутый пример обычно используется для других целей: он мешает вам работать с разными сообщениями с тем же значением (не одно и то же сообщение). Например, у меня есть два процесса: первый помещает в очередь некоторые URL-адреса, а второй - переносит URL из очереди и извлекает их. Что будет, если первый процесс ставит в очередь на один URL дважды (или даже больше)?

P.S. Я использую для этого Redis хранилище и setnx операцию (которая может устанавливать ключ только один раз).