Подтвердить что ты не робот

Как создать задержанную очередь в RabbitMQ?

Каков самый простой способ создать задержку (или парковку) с помощью Python, Pika и RabbitMQ? Я видел аналогичный questions, но ни один из них для Python.

Я нахожу это полезной идеей при разработке приложений, так как это позволяет нам дросселировать сообщения, которые нужно снова поставить в очередь.

Всегда есть вероятность, что вы получите больше сообщений, чем вы можете обрабатывать, возможно, HTTP-сервер работает медленно, или база данных находится под слишком большим стрессом.

Я также нашел, что это очень полезно, когда что-то пошло не так в сценариях, где существует нулевая толерантность к потере сообщений, и в то время как повторная очередь сообщений, которые не могут быть обработаны, может решить эту проблему. Это также может вызвать проблемы, когда сообщение будет снова и снова появляться в очереди. Потенциально вызывают проблемы с производительностью и лог-спам.

4b9b3361

Ответ 1

Я нашел это чрезвычайно полезным при разработке моих приложений. Поскольку это дает вам альтернативу простому повторному размещению ваших сообщений. Это может легко уменьшить сложность вашего кода и является одной из многих мощных скрытых функций в RabbitMQ.

Шаги

Сначала нам нужно настроить два основных канала: один для основной очереди и один для очереди ожидания. В моем примере в конце я включаю пару дополнительных флагов, которые не требуются, но делает код более надежным; таких как confirm delivery, delivery_mode и durable. Подробнее об этом вы найдете в руководстве RabbitMQ .

После того, как мы установили каналы, мы добавим привязку к основному каналу, которую мы можем использовать для отправки сообщений с канала задержки в нашу основную очередь.

channel.queue_bind(exchange='amq.direct',
                   queue='hello')

Далее нам нужно настроить наш канал задержки для пересылки сообщений в основную очередь, как только они истекли.

delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000,
  'x-dead-letter-exchange' : 'amq.direct',
  'x-dead-letter-routing-key' : 'hello'
})
  • x-message-ttl (Сообщение - время для жизни)

    Обычно это используется для автоматического удаления старых сообщений в очереди после определенной продолжительности, но, добавив два необязательных аргумента, мы может изменить это поведение, и вместо этого этот параметр определяет в миллисекундах, сколько сообщений останется в очереди на задержку.

  • x-dead-letter-routing-key

    Эта переменная позволяет нам передавать сообщение в другую очередь как только они истекли, вместо поведения по умолчанию удаления это полностью.

  • x-dead-letter-exchange

    Эта переменная определяет, какой Exchange используется для передачи сообщения из hello_delay в очередь приветствия.

Публикация в очередь ожидания

Когда мы закончим настройку всех основных параметров Pika, вы просто отправляете сообщение в очередь ожидания, используя базовую публикацию.

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))

После выполнения script вы должны увидеть следующие очереди, созданные в вашем модуле управления RabbitMQ. enter image description here

Пример.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer 
# messages from our delay queue.
channel.queue_bind(exchange='amq.direct',
                   queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_delivery()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000, # Delay until the message is transferred in milliseconds.
  'x-dead-letter-exchange' : 'amq.direct', # Exchange used to transfer the message from A to B.
  'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))

print " [x] Sent"

Ответ 2

Вы можете использовать официальный плагин RabbitMQ: сообщение с задержкой x.

Во-первых, загрузите и скопируйте ez файл в Your_rabbitmq_root_path/plugins

Во-вторых, включите плагин (не нужно перезапускать сервер):

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Наконец, опубликуйте сообщение с заголовками "x-delay", например:

headers.put("x-delay", 5000);

Примечание:

Это не гарантирует безопасность вашего сообщения, потому что, если ваше сообщение истекает только во время простоя rabbitmq-server, к сожалению, сообщение потеряно. Поэтому будьте осторожны, когда используете эту схему.

Наслаждайтесь этим и получите дополнительную информацию в rabbitmq-delayed-message-exchange

Ответ 3

FYI, как это сделать в Spring 3.2.x.

<rabbit:queue name="delayQueue" durable="true" queue-arguments="delayQueueArguments"/>

<rabbit:queue-arguments id="delayQueueArguments">
  <entry key="x-message-ttl">
    <value type="java.lang.Long">10000</value>
  </entry>
  <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>
  <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/>
</rabbit:queue-arguments>


<rabbit:fanout-exchange name="finalDestinationTopic">
  <rabbit:bindings>
    <rabbit:binding queue="finalDestinationQueue"/>
  </rabbit:bindings>
</rabbit:fanout-exchange>

Ответ 4

Реализация NodeJS.

Все ясно из кода. Надеюсь, это сэкономит время.

var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});

// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
      deadLetterExchange: "my_final_delayed_exchange",
      messageTtl: 5000, // 5sec
}, function (err, q) {
      ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
      ch.bindQueue(q.queue, "my_final_delayed_exchange", '');

      ch.consume(q.queue, function (msg) {
          console.log("delayed - [x] %s", msg.content.toString());
      }, {noAck: true});
});

Ответ 5

Сообщение в очереди кролика может быть отложено двумя способами    - используя QUEUE TTL    - использование сообщения TTL Если все сообщения в очереди должны быть отложены для фиксированной временной очереди TTL. Если каждое сообщение должно быть отложено в разное время, используйте сообщение TTL. Я объяснил это с помощью python3 и pika module. pika Параметр BasicProperties "expiration" в миллисекундах должен быть установлен для задержки сообщения в очереди ожидания. После установки времени истечения срока действия опубликуйте сообщение delayed_queue ( "не фактическая очередь, где потребители ждут, чтобы потреблять" ), как только сообщение в delayed_queue истечет, сообщение будет перенаправлено в реальную очередь, используя обмен "amq.direct"

def delay_publish(self, messages, queue, headers=None, expiration=0):
    """
    Connect to RabbitMQ and publish messages to the queue
    Args:
        queue (string): queue name
        messages (list or single item): messages to publish to rabbit queue
        expiration(int): TTL in milliseconds for message
    """
    delay_queue = "".join([queue, "_delay"])
    logging.info('Publishing To Queue: {queue}'.format(queue=delay_queue))
    logging.info('Connecting to RabbitMQ: {host}'.format(
        host=self.rabbit_host))
    credentials = pika.PlainCredentials(
       RABBIT_MQ_USER, RABBIT_MQ_PASS)
    parameters = pika.ConnectionParameters(
       rabbit_host, RABBIT_MQ_PORT,
        RABBIT_MQ_VHOST, credentials, heartbeat_interval=0)
    connection = pika.BlockingConnection(parameters)

    channel = connection.channel()
    channel.queue_declare(queue=queue, durable=True)

    channel.queue_bind(exchange='amq.direct',
                       queue=queue)
    delay_channel = connection.channel()
    delay_channel.queue_declare(queue=delay_queue, durable=True,
                                arguments={
                                    'x-dead-letter-exchange': 'amq.direct',
                                    'x-dead-letter-routing-key': queue
                                })

    properties = pika.BasicProperties(
        delivery_mode=2, headers=headers, expiration=str(expiration))

    if type(messages) not in (list, tuple):
        messages = [messages]

    try:
        for message in messages:
            try:
                json_data = json.dumps(message)
            except Exception as err:
                logging.error(
                    'Error Jsonify Payload: {err}, {payload}'.format(
                        err=err, payload=repr(message)), exc_info=True
                )
                if (type(message) is dict) and ('data' in message):
                    message['data'] = {}
                    message['error'] = 'Payload Invalid For JSON'
                    json_data = json.dumps(message)
                else:
                    raise

            try:
                delay_channel.basic_publish(
                    exchange='', routing_key=delay_queue,
                    body=json_data, properties=properties)
            except Exception as err:
                logging.error(
                    'Error Publishing Data: {err}, {payload}'.format(
                        err=err, payload=json_data), exc_info=True
                )
                raise

    except Exception:
        raise

    finally:
        logging.info(
            'Done Publishing. Closing Connection to {queue}'.format(
                queue=delay_queue
            )
        )
        connection.close()

Ответ 6

В зависимости от вашего сценария и потребностей, я бы порекомендовал следующие подходы,