Подтвердить что ты не робот

Как я могу восстановить неподтвержденные сообщения AMQP из других каналов, чем мое подключение?

Кажется, чем дольше я работаю на сервере rabbitmq, тем больше проблем у меня с неподтвержденными сообщениями. Я хотел бы их повторить. На самом деле, похоже, для этого требуется команда amqp, но она применяется только к каналу, используемому вашим соединением. Я построил небольшую pika script, чтобы хотя бы попробовать, но я либо что-то пропустил, либо не может быть сделано так (как насчет с rabbitmqctl?)

import pika

credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
    credentials=credentials, virtual_host='***')

def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body

def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    

def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    

try:
    connection = pika.SelectConnection(parameters=parameters,\
        on_open_callback=on_connected)    

    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()
4b9b3361

Ответ 1

Неподтвержденные сообщения - это сообщения, которые были доставлены по сети потребителю, но еще не были отклонены или отклонены, но этот потребитель еще не закрыл канал или соединение, из которого он был первоначально получен. Поэтому брокер не может понять, есть ли у потребителя достаточно времени для обработки этих сообщений или если он забыл о них. Таким образом, он оставляет их в непризнанном состоянии до тех пор, пока либо потребитель не умрет, либо не получит ответ или отклонен.

Поскольку эти сообщения все еще могут быть корректно обработаны в будущем потребителем все еще живого, который изначально их потреблял, вы не можете (насколько мне известно) вставить другого друга в микс и попытаться принять внешние решения о них. Вам нужно исправить своих потребителей, чтобы принимать решения по каждому сообщению, когда они обрабатываются, а не оставлять старые сообщения неподтвержденными.

Ответ 2

Если сообщения не удалены, есть два способа вернуть их в очередь:

  1. basic.nack

    Эта команда заставит сообщение быть помещенным обратно в очередь и повторно отправлено.

  2. Отключиться от брокера

    Это действие заставит все необработанные сообщения с этого канала вернуться в очередь.

ПРИМЕЧАНИЕ: basic.recover будет пытаться переиздавать незакрепленные сообщения на одном канале (одному и тому же потребителю), что иногда является желаемым поведением.

Спецификация RabbitMQ для basic.recover и basic.nack


Реальный вопрос: почему сообщения непризнанные?

Возможные сценарии:

  • Потребитель получает слишком много сообщений, а затем не обрабатывает и не подбрасывает их достаточно быстро.

    Решение. Предварительно выберите как можно меньше сообщений.

  • Багги-клиентская библиотека (у меня есть эта проблема в настоящее время с pika 0.9.13. Если в очереди много сообщений, определенное количество сообщений застрянет без присмотра, даже через несколько часов.

    Решение. Я должен перезагрузить пользователя несколько раз, пока все незакрепленные сообщения не исчезнут из очереди.

Ответ 3

Все неподтвержденные сообщения перейдут в состояние готовности, когда все рабочие/потребители будут остановлены.

Убедитесь, что все работники остановлены, подтвердив вывод grep on ps aux и остановив/убив их, если они найдены.

Если вы управляете сотрудниками с помощью супервизора, который показывает, что рабочий остановлен, вы можете проверить наличие зомби. Супервизор сообщает, что рабочий должен быть остановлен, но все же вы обнаружите, что процессы зомби работают, когда grepped на выходе ps aux. Убивание процессов зомби приведет сообщения обратно в состояние готовности.