Подтвердить что ты не робот

Неблокирующий потребитель RabbitMQ

Я использую RabbitMQ в Python для управления несколькими очередями между производителем и несколькими потребителями. В примере на веб-сайте RabbitMQ (модель маршрутизации) потребители блокируются. Это означает, что они останавливаются на start_consuming() и выполняют функцию обратного вызова каждый раз, когда в очереди появляется новая "задача".

Мой вопрос: как я могу реализовать своего потребителя так, что он все еще ждет задач (так, функция обратного вызова вызывается каждый раз, когда в очереди появляются новые вещи), но в то же время он может выполнять другие работа/код.

Спасибо

4b9b3361

Ответ 1

Форма FAQ:

Пика не имеет никакого понятия о потоке в коде. Если хочешь используйте Pika с резьбой, убедитесь, что у вас есть соединение Pika поток, созданный в этой теме. Это не безопасно делить один Пика соединение между потоками,

Итак, давайте создадим соединение внутри потока:

import pika


class PikaMassenger():

    exchange_name = '...'

    def __init__(self, *args, **kwargs):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.conn.channel()
        self.channel.exchange_declare(
            exchange=self.exchange_name, 
            exchange_type='topic')

    def consume(self, keys, callback):
        result = self.channel.queue_declare('', exclusive=True)
        queue_name = result.method.queue
        for key in keys:
            self.channel.queue_bind(
                exchange=self.exchange_name, 
                queue=queue_name, 
                routing_key=key)

        self.channel.basic_consume(
            queue=queue_name, 
            on_message_callback=callback, 
            auto_ack=True)

        self.channel.start_consuming()


    def __enter__(self):
        return self


    def __exit__(self, exc_type, exc_value, traceback):
        self.conn.close()

def start_consumer():

    def callback(ch, method, properties, body):
        print(" [x] %r:%r consumed" % (method.routing_key, body))

    with PikaMassenger() as consumer:
        consumer.consume(keys=[...], callback=callback)


consumer_thread = threading.Thread(target=start_consumer)
consumer_thread.start()

Ответ 2

для приемника

import pika

messages = []
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='message')

def callback(ch, method, properties, message):
    print(message)
    messages.append(message)

channel.basic_consume(callback,queue='message',no_ack=True)

и

channel.basic_consume(callback,queue='message',no_ack=True)

когда вам нужно) или в потоке

import threading

import pika
import time

messages = []

def recieve_messages():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        messages.append(body)

    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
    # channel.start_consuming()
    mq_recieve_thread = threading.Thread(target=channel.start_consuming)
    mq_recieve_thread.start()

recieve_messages()
while True:
    print messages
    time.sleep(1)