Подтвердить что ты не робот

Как запросить сообщения в RabbitMQ

После того, как потребитель получит сообщение, потребитель/работник выполняет некоторые проверки, а затем вызывает веб-службу. На этом этапе, если какая-либо ошибка возникает или проверка не выполняется, мы хотим, чтобы сообщение вернулось в очередь, из которой она изначально была израсходована.

Я прочитал документацию RabbitMQ. Но я смущен различиями между методами reject, nack и cancel.

4b9b3361

Ответ 1

Короткий ответ:

Чтобы запросить конкретное сообщение, вы можете выбрать как basic.reject, так и basic.nack с флагом multiple, установленным в false.

Вызов

basic.consume также может привести к повторному доставке сообщений, если вы используете подтверждение сообщения, и есть недопустимое сообщение для потребителя в определенное время и выход потребителя без их использования.

basic.recover будет обновлять все неактивные сообщения на определенном канале.

Длинный ответ:

basic.reject и basic.nack обе служат с той же целью - отбросить или запросить сообщение, которое не может быть обработано конкретным потребителем (в данный момент, при определенных условиях или вообще). Основное различие между ними заключается в том, что basic.nack поддерживает обработку объемных сообщений, а basic.reject - нет.

Это различие, описанное в Отрицательные благодарности на официальном веб-сайте RabbitMQ:

Спецификация AMQP определяет метод basic.reject, который позволяет клиентам отклонять отдельные, доставленные сообщения, инструктируя брокера либо отбрасывать их, либо запрашивать их. К сожалению, basic.reject не поддерживает поддержку отрицательного подтверждения сообщений в массовом порядке.

Чтобы решить эту проблему, RabbitMQ поддерживает метод basic.nack, который предоставляет все функции basic.reject, в то время как также разрешает массовую обработку сообщений.

Чтобы отклонить сообщения массовыми, клиенты устанавливают флаг multiple метода basic.nack на true. Затем брокер отклонит все неподтвержденные, доставленные сообщения до и включая сообщение, указанное в поле delivery_tag метода basic.nack. В этом отношении basic.nack дополняет семантику насыщенного подтверждения basic.ack.

Обратите внимание, что метод basic.nack является расширением RabbitMQ, а метод basic.reject является частью спецификации AMQP 0.9.1.

Что касается метода basic.cancel, он использовал для уведомления сервера, что клиент останавливает потребление сообщений. Обратите внимание, что клиент может получить произвольный номер сообщения между методом basic.cancel, отправляющим получение ответа cancel-ok. Если подтверждение сообщения используется клиентом, и у него есть какие-либо не подтвержденные сообщения, они будут возвращены в очередь, из которой они изначально были израсходованы.

basic.recover имеет некоторые ограничения в RabbitMQ: it  - basic.recover с requeue = false  - basic.recover synchronicity

В дополнение к ошибкам в соответствии с спецификациями RabbitMQ basic.recover имеет частичную поддержку (восстановление с requeue = false не поддерживается.)

Примечание о basic.consume:

Когда basic.consume запускается без auto-ack (no­ack=false), и есть некоторые ожидающие сообщения неактивные сообщения, тогда, когда потребитель (отмирает, фатальная ошибка, исключение, что угодно), что ожидающие сообщения будут повторно отправлены. Технически, что ожидающие сообщения не будут обрабатываться (даже мертвые буквы), пока потребитель не выпустит их (ack/nack/reject/recover). Только после этого они будут обработаны (например, с помощью трюка).

Например, предположим, что мы отправляем изначально 5 сообщений подряд:

Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)

И затем потребляйте 3 из них, но не ответите им, а затем отмените использование потребителя. У нас будет такая ситуация:

Queue(main) (tail) { [4] [3] [2*] [1*] [0*] } (head)

где star (*) отмечает, что флаг redelivered установлен на true.

Предположим, что мы имеем ситуацию с мертвым буквенным набором и очередью для сообщений с мертвой буквой

Exchange(e-main)                                   Exchange(e-dead) 
  Queue(main){x-dead-letter-exchange: "e-dead"}       Queue(dead) 

И предположим, что мы отправляем сообщение 5 с атрибутом expire, установленным в 5000 (5 секунд):

Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
Queue(dead) (tail) { }(head)

и затем мы потребляем 3 сообщения из очереди main и удерживаем их в течение 10 секунд:

Queue(main) (tail) { [2!] [1!] [0!] } (head)
Queue(dead) (tail) { [4*] [3*] } (head)

где восклицательный знак (!) обозначает сообщение без упаковки. Такие сообщения не могут быть доставлены никому из потребителей, и их обычно нельзя просматривать на панели управления. Но позвольте отменить потребителя, помните, что он все еще содержит 3 неактивных сообщения:

Queue(main) (tail) { } (head)
Queue(dead) (tail) { [2*] [1*] [0*] [4*] [3*] } (head)

Итак, теперь, когда 3 сообщения, которые были в голове, возвращаются в исходную очередь, но поскольку у них есть TTL-сообщение для каждого сообщения, они помещаются в хвост очереди с мертвой буквой (конечно, посредством обмена мертвой буквой).

P.S:.

Потребление сообщения, а также прослушивание нового, каким-то образом отличается от прямого доступа к очереди (получение одного или нескольких сообщений без заботы о других). Подробнее см. basic.get.