Подтвердить что ты не робот

RabbitMQ: постоянное сообщение с обменом темой

Я очень новичок в RabbitMQ.

Я установил обмен темой. Потребители могут быть запущены после издателя. Я бы хотел, чтобы потребители могли получать сообщения, которые были отправлены до того, как они появились, и которые еще не были использованы.

Обмен настроен со следующими параметрами:

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

Сообщения публикуются с помощью этого параметра:

delivery_mode => 2

Потребители используют get() для извлечения сообщений из обмена.

К сожалению, любое сообщение, опубликованное до того, как какой-либо клиент был потерян, потерян. Я использовал разные комбинации.

Я думаю, моя проблема в том, что обмен не содержит сообщений. Возможно, мне нужно иметь очередь между издателем и очередью. Но это, похоже, не работает с обменом "темами", где сообщения направляются с помощью ключа.

Любая идея, как я должен действовать. Я использую Perl-привязку Net:: RabbitMQ (не имеет значения) и RabbitMQ 2.2.0.

4b9b3361

Ответ 1

Вам нужна прочная очередь для хранения сообщений, если нет подключенных потребителей, доступных для обработки сообщений в момент их публикации.

Обмен не сохраняет сообщения, но может быть в очереди. Запутанная часть заключается в том, что обмен может быть отмечен как "долговечный", но все, что на самом деле означает, что сам обмен будет по-прежнему присутствовать, если вы перезапустите своего брокера, но он не означает, что любые сообщения, отправленные что обмен автоматически сохраняется.

Учитывая это, вот два варианта:

  • Выполните административный шаг, прежде чем запускать издатели для создания очереди (ов) самостоятельно. Для этого вы можете использовать веб-интерфейс или инструменты командной строки. Убедитесь, что вы создали его как прочную очередь, чтобы он сохранял любые сообщения, которые были перенаправлены на него, даже если активных пользователей нет.
  • Предполагая, что ваши потребители закодированы, чтобы всегда объявлять (и, следовательно, автоматически создавать) свои обмены и очереди при запуске (и объявлять их как долговечные), просто запустить всех своих потребителей хотя бы один раз до начиная с любых издателей. Это обеспечит правильное создание всех ваших очередей. Затем вы можете отключить потребителей, пока они действительно не понадобятся, потому что очереди будут постоянно хранить любые будущие сообщения, направленные им.

Я бы пошел на # 1. Не может быть много шагов для выполнения, и вы всегда можете script выполнить необходимые шаги, чтобы их можно было повторить. Плюс, если все ваши потребители будут тянуть из одной и той же очереди (а не иметь выделенную очередь), это действительно минимальная часть административных накладных расходов.

Очереди - это то, что нужно контролировать и контролировать. В противном случае вы можете закончить тем, что потребители-изгои объявляют прочные очереди, используя их в течение нескольких минут, но никогда больше. Вскоре после того, как у вас будет постоянно растущая очередь, ничто не уменьшит ее размер и надвигающийся апокалипсис брокера.

Ответ 2

Как упоминал Брайан, обмен не хранит сообщения и в основном отвечает за маршрутизацию сообщений на другой обмен /s или queue/s. Если обмен не связан с очередью, все сообщения, отправленные на этот обмен, будут "потеряны"

Вам не нужно объявлять постоянные клиентские очереди в издателе script, так как это может не быть масштабируемым. Очереди могут динамически создаваться вашими издателями и маршрутизироваться внутри, используя привязку обмена к обмену.

RabbitMQ поддерживает привязки обмена к обмену, которые позволят гибкость, развязку топологии и другие преимущества. Вы можете прочитать здесь RabbitMQ Exchange для привязки Exchange [AMPQ]

RabbitMQ Exchange для обмена ссылками

Example Topology

Пример кода Python для создания привязки обмена к обмену с персистентностью, если потребитель не присутствует с использованием очереди.

#!/usr/bin/env python
import pika
import sys


connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()


#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)

#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)

#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')

##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)

#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')