Подтвердить что ты не робот

Как убрать сообщение в RabbitMQ?

У меня есть что-то вроде очереди на работу над RabbitMQ, и после запроса на отмену задания я хотел бы отменить задачи, которые еще не начали обрабатывать (их сообщения не были ack'd), что соответствует отводу этих сообщений из очередей, к которым они были направлены.

Я не нашел эту функциональность в AMQP или в RabbitMQ API; возможно, я не искал достаточно хорошо? Или мне придется использовать обходной путь (это не сложно, но все же)?

4b9b3361

Ответ 1

Я бы решил этот сценарий, попросив работника проверить какой-то авторитетный источник данных, чтобы определить, должно ли выполняться задание или нет. Например, рабочий проверит статус задания в базе данных, чтобы узнать, было ли задание уже отменено.

В сценариях, где скорость обработки заданий может быть быстрее, чем скорость, с которой авторитарный магазин может обновляться и считываться, может оказаться полезным менее гарантированное хранилище данных, которое торгует скоростью для других характеристик.

Примером этого может быть использование Redis в качестве хранилища для отмены обработки сообщения вместо реляционного DB, такого как MySQL. Redis работает очень быстро, но дает меньше гарантий относительно данных, которые он хранит, тогда как MySQL намного медленнее, но предлагает больше гарантий относительно данных, которые он хранит.

В конце концов, концепция проверки с другим источником для того, обрабатывать сообщение или нет, одинакова, но способ реализации, который зависит от вашего конкретного сценария.

Ответ 2

RabbitMQ не позволяет вам изменять или удалять сообщения после их размещения. Для этого вы хотите, чтобы какая-то база данных сохраняла состояние каждой работы и использовала RabbitMQ для уведомления заинтересованных сторон об изменениях в этом состоянии.

Для слабых томов вы можете клонировать его вместе с очередью на каждое задание. Создайте очередь, опубликуйте описание задания в очереди, объявите имя очереди рабочим. Если задание необходимо отменить до его обработки, удалите очередь заданий; когда рабочие приходят, чтобы получить описание работы, они заметят, что очередь исчезла.

Более легкий и, как правило, лучше использовать redis или другое хранилище ключей/значений для хранения состояния задания (с удаленной или отсутствующей записью, означающей отмененную или несуществующую работу) и использовать rabbitmq для уведомления о новых/удаленных/измененных записях в хранилище ключей/значений.

Ответ 3

Как минимум два способа достижения вашей цели:

  • basic.reject будет запрашивать сообщение, если установлено requeue=true (иначе оно отклонит сообщение).
    (поддерживается с RabbitMQ 2.0.0, см. http://www.rabbitmq.com/blog/2010/08/03/well-ill-let-you-go-basicreject-in-rabbitmq/).

  • basic.recover попросит брокера переадресовать незакрепленные сообщения на канал.

Ответ 4

Вам нужно подписаться на все очереди, на которые были перенаправлены сообщения, и использовать их с помощью ack.

Например, если вы публикуете на тему обмена с "тестом" в качестве ключа маршрутизации, и есть 3 постоянных очереди, которые подписываются на "тест", вам нужно будет использовать эти три очереди. Возможно, было бы лучше добавить еще одну очередь, которую также будут прослушивать ваши потребительские процессы, и сказать им игнорировать эти сообщения.

Альтернативой, поскольку вы используете RabbitMQ, является создание настраиваемого плагина обмена, который будет принимать некоторые внедиапазонные команды для очистки всех очередей. Например, у вас может быть этот обмен, который читает специальный заголовок сообщения, который сообщает ему очистить все очереди, которым предназначено это сообщение. Это требует написания кода Erlang, но есть 4 разных типа обмена, поэтому вам нужно будет скопировать наиболее похожий код и написать код для новых bahaviours. Если для этого используются только пользовательские заголовки, то тело сообщения может быть обычным сообщением для потребителей.

Подводя итог:

1) издателю необходимо самому использовать сообщения 2) издатель может отправить специальное сообщение в специальной очереди, чтобы сообщить потребителям игнорировать сообщение 3) издатель может отправить специальное сообщение на пользовательский обмен, который очистит все существующие сообщения от очередей до отправки этого специального сообщения потребителям.