Мы пытаемся создать базовую систему направленной очереди, в которой производитель будет генерировать несколько задач, и один или несколько потребителей будут одновременно захватывать задание, обрабатывать и подтверждать сообщение.
Проблема в том, что обработка может занять 10-20 минут, и мы не отвечаем на сообщения в то время, заставляя сервер отключать нас.
Вот некоторый псевдокод для нашего потребителя:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
После завершения первой задачи исключение выбрасывается где-то глубоко внутри BlockingConnection, жалуясь, что сокет был reset. Кроме того, журналы RabbitMQ показывают, что потребитель был отключен, чтобы не реагировать вовремя (почему он сбрасывает соединение, а не посылает FIN, это странно, но об этом мы не будем беспокоиться).
Мы много искали, потому что считали, что это нормальный случай использования RabbitMQ (имея много длинных задач, которые должны быть разделены между многими потребителями), но похоже, что у кого-то еще не было этой проблемы. Наконец мы наткнулись на нить, где было рекомендовано использовать heartbeats и порождать long_running_task()
в отдельном потоке.
Итак, код стал:
#!/usr/bin/env python
import pika
import time
import threading
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost',
heartbeat_interval=20))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def thread_func(ch, method, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
def callback(ch, method, properties, body):
threading.Thread(target=thread_func, args=(ch, method, body)).start()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
И это, похоже, работает, но это очень грязно. Мы уверены, что объект ch
является потокобезопасным? Кроме того, представьте, что long_running_task()
использует этот параметр соединения, чтобы добавить задачу в новую очередь (т.е. Первая часть этого длинного процесса завершена, давайте отправьте задачу на вторую часть). Итак, поток использует объект connection
. Безопасен ли этот поток?
Насколько это важно, какой предпочтительный способ сделать это? Я чувствую, что это очень грязно и, возможно, не потокобезопасно, так что, возможно, мы не делаем это правильно. Спасибо!