Подтвердить что ты не робот

Ошибка "неизвестного тега доставки" возникает, когда я пытаюсь отправить сообщения Ack в RabbitMQ с помощью pika (python)

Я хочу обрабатывать сообщения в нескольких потоках, но я получаю ошибку во время выполнения этого кода:

from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback


def doWork(body, args, channel):


    r = random.random()
    time.sleep(r * 10)
    try:        
        channel.basic_ack(delivery_tag=args.delivery_tag)

    except :
        traceback.print_exc()


auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()


while True:

    time.sleep(0.03)    
    try:

        method_frame, header_frame, body = channel.basic_get(queue="test_queue")
        if method_frame.NAME == 'Basic.GetEmpty':
            continue        

        t = threading.Thread(target=doWork, args=[body, method_frame, channel])
        t.setDaemon(True)
        t.start()

    except Exception, e:
        traceback.print_exc()
        continue

Описание ошибки:

Traceback (most recent call last):
  File "C:\work\projects\mq\start.py", line 43, in 
    method_frame, header_frame, body = channel.basic_get(queue="test_queue")
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 318, in basic_get
    self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack)
  File "C:\work\projects\mq\libs\pika\channel.py", line 469, in basic_get
    no_ack=no_ack))
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 244, in send_method
    self.connection.process_data_events()
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 94, in process_data_events
    self._handle_read()
  File "C:\work\projects\mq\libs\pika\adapters\base_connection.py", line 162, in _handle_read
    self._on_data_available(data)
  File "C:\work\projects\mq\libs\pika\connection.py", line 589, in _on_data_available
    frame)                 # Args
  File "C:\work\projects\mq\libs\pika\callback.py", line 124, in process
    callback(*args, **keywords)
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 269, in _on_remote_close
    frame.method.reply_text)
AMQPChannelError: (406, 'PRECONDITION_FAILED - unknown delivery tag 204')

Версии: pika 0.9.5, rabbitMQ 2.6.1

4b9b3361

Ответ 1

Вероятно, проблема заключается в том, что вы устанавливаете no_ack=True следующим образом:

consumer_tag = channel.basic_consume(
    message_delivery_event,
    no_ack=True,
    queue=queue,
)

И затем подтверждая сообщения:

channel.basic_ack(delivery_tag=args.delivery_tag)

Вы должны выбрать, хотите ли вы подтвердить или нет, и установить правильный параметр потребления.

Ответ 2

Для меня это было просто, что я сказал очереди, что я не собираюсь подтверждать, тогда я подтвердил.

Например, НЕПРАВИЛЬНО:

channel.basic_consume(callback, queue=queue_name, no_ack=True)

а затем в моем обратном вызове:

def callback(ch, method, properties, body):
  # do stuff
  ch.basic_ack(delivery_tag = method.delivery_tag)

ПРАВО:

channel.basic_consume(callback, queue=queue_name, no_ack=False)

Итог: если вы хотите подтвердить вручную, установите no_ack = False.

Из документов:

no_ack: (bool) если установлено значение True, будет использоваться режим автоматического подтверждения (см. http://www.rabbitmq.com/confirms.html)

Ответ 3

У меня нет исправления, но я могу проверить, что это происходит, используя BlockingConnection  адаптер.

Он постоянно возникает при подтверждении или отклонении сообщения, которое повторно отправляется в ответ на channel.basic_recover()

pika 0.9.5, rabbitMQ 2.2.0, python 2.7 и Erlang R14B01

Обходной путь, который у меня есть, - всегда указывать deliver_tag = 0

Я подозреваю, что это работает только в том случае, если сообщение, которое вы вызываете /nacking, является последним, которое вы прочитали (в потоке). Библиотека, которую я пишу, абстрагирует сообщение таким образом, что каждый из них может быть подтвержден независимо, что ломается с этим решением.

Может ли кто-нибудь подтвердить, было ли это исправлено или подтверждено кем-либо еще в команде pika? Или, может быть, это проблема с RabbitMQ?

Ответ 4

В коде есть ошибка. Вы делитесь каналом через потоки. Это не поддерживается pika (см. FAQ). У вас есть 2 варианта:

  • Определите флаг no_ack=True в basic_get(...) и не используйте объект канала в функции потока doWork(...)
  • Если вам нужно отправить сообщение ACK только после того, как вы закончили свою работу, то пусть основной поток (цикл while True:) обрабатывает сообщение ack (а не рабочий поток). Ниже приведена измененная версия вашего кода, которая делает это.

    from __future__ import with_statement
    import pika
    import sys
    from pika.adapters.blocking_connection import BlockingConnection
    from pika import connection, credentials
    import time
    import threading
    import random
    from pika.adapters.select_connection import SelectConnection
    from pika.connection import Connection
    import traceback
    from Queue import Queue, Empty
    
    def doWork(body, args, channel, ack_queue):
        time.sleep(random.random())
        ack_queue.put(args.delivery_tag)
    
    def doAck(channel):
        while True:
            try:
                r = ack_queue.get_nowait()
            except Empty:
                r = None
            if r is None:
                break
            try:
                channel.basic_ack(delivery_tag=r)
            except:
                traceback.print_exc()
    
    auth = credentials.PlainCredentials(username="guest", password="guest")
    params = connection.ConnectionParameters(host="localhost", credentials=auth)
    conn = BlockingConnection(params)
    channel = conn.channel()
    # Create a queue for the messages that should be ACKed by main thread
    ack_queue = Queue()
    
    while True:
        time.sleep(0.03)    
        try:
            doAck(channel)
            method_frame, header_frame, body = channel.basic_get(queue="test_queue")
            if method_frame.NAME == 'Basic.GetEmpty':
                continue        
            t = threading.Thread(target=doWork, args=[body, method_frame, channel, ack_queue])
            t.setDaemon(True)
            t.start()
        except Exception, e:
            traceback.print_exc()
            continue
    

Ответ 5

После просмотра RabbitMQ - обновлена ​​до новой версии и получила много "Неизвестный тег доставки 1 PRECONDITION_FAILED"

Я изменил свое основное потребление, чтобы выглядеть так:

    consumer_tag = channel.basic_consume(
        message_delivery_event,
        no_ack=True,
        queue=queue,
    )

Это привело к возникновению описанной ошибки при первоначальных (но не повторенных) подтверждениях при указании тега доставки сообщений. Доставка была извлечена из структуры метода доставки сообщений.

Использование

    channel.basic_ack(delivery_tag=0)

также подавляет ошибку в этом случае

Глядя на http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-July/013664.html, кажется, что это может быть проблемой в RabbitMQ.

Ответ 6

Эта проблема возникает из-за того, что вы установили {noack: true}, но все еще пытаетесь отправить подтверждение.

Ответ 7

Вы также можете столкнуться с этой ошибкой, если вы пытаетесь подтвердить сообщение на другом канале, из которого оно было создано. Это может произойти, если вы закрываете или воссоздаете каналы.

Из документов: https://www.rabbitmq.com/confirms.html

Другой сценарий, в котором брокер будет жаловаться на "неизвестный тег доставки", - это когда подтверждение, положительное или отрицательное, предпринимается по каналу, отличному от того, по которому была получена доставка. Поставки должны быть подтверждены на том же канале.