Подтвердить что ты не робот

Потребляйте несколько очередей в python/pika

Я пытаюсь создать пользователя, который будет подписываться на несколько очередей, а затем обрабатывать сообщения по мере их поступления.

Проблема заключается в том, что, когда в первой очереди присутствуют некоторые данные, они потребляют первую очередь и никогда не потребляют вторую очередь. Однако, когда первая очередь пуста, она переходит в следующую очередь и затем потребляет обе очереди одновременно.

Я сначала реализовал потоки, но хочу избегать этого, когда библиотека pika делает это для меня без особых сложностей. Ниже мой код:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()
4b9b3361

Ответ 1

Одним из возможных решений является использование неблокирующего соединения и потребление сообщений.

import pika


def callback(channel, method, properties, body):
    print(body)
    channel.basic_ack(delivery_tag=method.delivery_tag)


def on_open(connection):
    connection.channel(on_channel_open)


def on_channel_open(channel):
    channel.basic_consume(callback, queue='queue1')
    channel.basic_consume(callback, queue='queue2')


parameters = pika.URLParameters('amqp://guest:[email protected]:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()

Это будет подключаться к нескольким очередям и будет потреблять сообщения соответственно.

Ответ 2

Вероятнее всего, проблема заключается в том, что первый вызов выпустил Basic.Consume и уже получил сообщения из предварительно заполненной очереди до выхода второго вызова. Возможно, вам захочется попробовать установить счетчик предварительной выборки QoS на 1, что ограничит RabbitMQ отправкой вам более чем одного сообщения за раз.