Подтвердить что ты не робот

Обработка длительных задач в pika/RabbitMQ

Мы пытаемся создать базовую систему направленной очереди, в которой производитель будет генерировать несколько задач, и один или несколько потребителей будут одновременно захватывать задание, обрабатывать и подтверждать сообщение.

Проблема в том, что обработка может занять 10-20 минут, и мы не отвечаем на сообщения в то время, заставляя сервер отключать нас.

Вот некоторый псевдокод для нашего потребителя:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

После завершения первой задачи исключение выбрасывается где-то глубоко внутри BlockingConnection, жалуясь, что сокет был reset. Кроме того, журналы RabbitMQ показывают, что потребитель был отключен, чтобы не реагировать вовремя (почему он сбрасывает соединение, а не посылает FIN, это странно, но об этом мы не будем беспокоиться).

Мы много искали, потому что считали, что это нормальный случай использования RabbitMQ (имея много длинных задач, которые должны быть разделены между многими потребителями), но похоже, что у кого-то еще не было этой проблемы. Наконец мы наткнулись на нить, где было рекомендовано использовать heartbeats и порождать long_running_task() в отдельном потоке.

Итак, код стал:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

И это, похоже, работает, но это очень грязно. Мы уверены, что объект ch является потокобезопасным? Кроме того, представьте, что long_running_task() использует этот параметр соединения, чтобы добавить задачу в новую очередь (т.е. Первая часть этого длинного процесса завершена, давайте отправьте задачу на вторую часть). Итак, поток использует объект connection. Безопасен ли этот поток?

Насколько это важно, какой предпочтительный способ сделать это? Я чувствую, что это очень грязно и, возможно, не потокобезопасно, так что, возможно, мы не делаем это правильно. Спасибо!

4b9b3361

Ответ 1

На данный момент лучше всего отключить пульс, это заставит RabbitMQ закрыть соединение, если вы слишком долго блокируете его. Я экспериментирую с управлением подключением pika core и циклом ввода-вывода, работающим в фоновом потоке, но не достаточно стабильным для выпуска.

Ответ 2

Я столкнулся с той же проблемой, что и у вас.
Мое решение:

  • отключить пульс на стороне сервера.
  • оценить максимальное время выполнения задачи
  • установить таймаут пульса клиента на время, полученное с шага 2

Почему это?

Как я тестирую со следующими случаями:

случай один
  • Сердцебиение сервера включено, 1800 секунд
  • клиент unset

Я все еще получаю ошибку, когда задача работает очень долго → 1800

дело два
  • отключить пульс сервера
  • отключить пульс клиента

На стороне клиента нет ошибки, кроме одной проблемы - когда клиент терпит крах (мой перезапуск os при некоторых ошибках), tcp-соединение все еще можно увидеть в плагине управления Rabbitmq. И это сбивает с толку.

третий случай
  • отключить пульс сервера
  • включите клиентское сердцебиение, установите его в ожидаемое максимальное время работы

В этом случае я могу динамически изменять каждый тепловой удар на индивидуализированном клиенте. На самом деле, я поставил сердцебиение на машины, которые разбились часто. Кроме того, я могу видеть автономную машину через плагин Rabbitmq Manangement.

Окружающая среда

ОС: centos x86_64
pika: 0,9.13
rabbitmq: 3.3.1

Ответ 3

Пожалуйста, не отключайте сердцебиение!

Начиная с Pika 0.12.0, используйте технику, описанную в этом примере кода, чтобы запустить долгосрочное задание в отдельном потоке, а затем подтвердить сообщение из этого потока.


ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список рассылки rabbitmq-users и только иногда отвечает на вопросы о StackOverflow.

Ответ 4

Не отключайте сердцебиение.
Лучшее решение - запустить задачу в отдельном потоке и установить prefetch_count 1 чтобы потребитель получал только 1 неподтвержденное сообщение, используя что-то вроде этого channel.basic_qos(prefetch_count=1)

Ответ 5

  1. Вы можете периодически вызывать connection.process_data_events() в вашем long_running_task(connection), эта функция будет отправлять сердцебиение на сервер при вызове и держать клиента pika подальше от закрытия.
  2. Установите значение сердцебиения больше, чем период connection.process_data_events() в BlockingConnection вашей pika.

Ответ 6

Вы также можете настроить новый поток, обработать сообщение в этом новом потоке и вызвать .sleep для соединения, пока этот поток .sleep чтобы предотвратить пропуски пульса. Вот пример блока кода, взятого из @gmr в github, и ссылка на проблему для дальнейшего использования.

import re
import json
import threading

from google.cloud import bigquery
import pandas as pd
import pika
from unidecode import unidecode

def process_export(url, tablename):
    df = pd.read_csv(csvURL, encoding="utf-8")
    print("read in the csv")
    columns = list(df)
    ascii_only_name = [unidecode(name) for name in columns]
    cleaned_column_names = [re.sub("[^a-zA-Z0-9_ ]", "", name) for name in ascii_only_name]
    underscored_names = [name.replace(" ", "_") for name in cleaned_column_names]
    valid_gbq_tablename = "test." + tablename
    df.columns = underscored_names

    # try:
    df.to_gbq(valid_gbq_tablename, "some_project", if_exists="append", verbose=True, chunksize=10000)
    # print("Finished Exporting")
    # except Exception as error:
    #     print("unable to export due to: ")
    #     print(error)
    #     print()

def data_handler(channel, method, properties, body):
    body = json.loads(body)

    thread = threading.Thread(target=process_export, args=(body["csvURL"], body["tablename"]))
    thread.start()
    while thread.is_alive():  # Loop while the thread is processing
        channel._connection.sleep(1.0)
    print('Back from thread')
    channel.basic_ack(delivery_tag=method.delivery_tag)


def main():
    params = pika.ConnectionParameters(host='localhost', heartbeat=60)
    connection = pika.BlockingConnection(params)
    channel = connection.channel()
    channel.queue_declare(queue="some_queue", durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(data_handler, queue="some_queue")
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
    channel.close()

if __name__ == '__main__':
    main()
python

Ссылка: https://github.com/pika/pika/issues/930#issuecomment-360333837