Можно ли отправить сообщение через RabbitMQ с некоторой задержкой? Например, я хочу завершить сеанс клиента через 30 минут, и я отправлю сообщение, которое будет обработано через 30 минут.
Отложенное сообщение в RabbitMQ
Ответ 1
С выпуском RabbitMQ v2.8 теперь доступна плановая доставка, но как косвенная функция: http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html
Ответ 2
В настоящее время это невозможно. Вы должны хранить отметки времени истечения срока действия в базе данных или что-то похожее, а затем иметь вспомогательную программу, которая считывает эти метки времени и ставит в очередь сообщение.
Отложенные сообщения часто запрашиваются, поскольку они полезны во многих ситуациях. Однако, если вам нужно прекратить сеансы клиентов, я считаю, что обмен сообщениями не является идеальным решением для вас, и что другой подход может работать лучше.
Ответ 3
Поскольку у меня нет достаточной репутации для добавления комментария, отправляйте новый ответ. Это просто дополнение к тому, что уже обсуждалось в http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html
За исключением того, что вместо ttl для сообщений вы можете установить его на уровне очереди. Также вы можете избежать создания нового обмена только ради перенаправления сообщений в другую очередь. Вот пример кода Java:
Производитель:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class DelayedProducer {
private final static String QUEUE_NAME = "ParkingQueue";
private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);
arguments.put("x-dead-letter-exchange", "");
arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME );
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for (int i=0; i<5; i++) {
String message = "This is a sample message " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("message "+i+" got published to the queue!");
Thread.sleep(3000);
}
channel.close();
connection.close();
}
}
Потребитель:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
Ответ 4
Похоже, этот пост в блоге описывает использование обмена мертвой буквой и сообщение ttl, чтобы сделать что-то подобное.
В приведенном ниже коде используются CoffeeScript и Node.JS для доступа к Rabbit и реализации чего-то подобного.
amqp = require 'amqp'
events = require 'events'
em = new events.EventEmitter()
conn = amqp.createConnection()
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->
conn.queue key, {
arguments:{
"x-dead-letter-exchange":"immediate"
, "x-message-ttl": 5000
, "x-expires": 6000
}
}, ->
conn.publish key, {v:1}, {contentType:'application/json'}
conn.exchange 'immediate'
conn.queue 'right.now.queue', {
autoDelete: false
, durable: true
}, (q) ->
q.bind('immediate', 'right.now.queue')
q.subscribe (msg, headers, deliveryInfo) ->
console.log msg
console.log headers
Ответ 5
Благодаря ответу Нормана я смог реализовать его в NodeJS.
Все ясно из кода. Надеюсь, это сэкономит время.
var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});
// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
deadLetterExchange: "my_final_delayed_exchange",
messageTtl: 5000, // 5sec
}, function (err, q) {
ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});
ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
ch.bindQueue(q.queue, "my_final_delayed_exchange", '');
ch.consume(q.queue, function (msg) {
console.log("delayed - [x] %s", msg.content.toString());
}, {noAck: true});
});
Ответ 6
Есть два подхода, которые вы можете попробовать:
Старый подход: Установите заголовок TTL (время жизни) в каждом сообщении/очереди (политика), а затем введите DLQ для его обработки. после истечения срока действия ttl ваши сообщения будут перемещаться из DLQ в основную очередь, чтобы ваш слушатель мог обработать его.
Последний подход: Недавно RabbitMQ разработал RabbitMQ Delayed Message Plugin, с помощью которого вы можете добиться того же и этой поддержки плагина, доступной с RabbitMQ-3.5.8.
Вы можете объявить обмен с типом x-delayed-message, а затем публиковать сообщения с пользовательской задержкой заголовка заголовка, выражающей в миллисекундах время задержки для сообщения. Сообщение будет доставлено в соответствующие очереди после миллисекунд с задержкой x
Подробнее здесь: git
Ответ 7
Предположим, что у вас был контроль над потребителем, вы могли бы добиться отсрочки на таком потребителе:
Если мы уверены, что n-ое сообщение в очереди всегда имеет меньшую задержку, чем n + 1-е сообщение (это может случиться для многих случаев использования): производитель отправляет timeInformation в задании, передавая время, в которое требуется эта работа для выполнения (currentTime + delay). Потребитель:
1) Считывает запланированное время из задачи
2) если currentTime > scheduleTime идти вперед.
Else delay = scheduleTime - currentTime
сон за время, указанное задержкой
Пользователь всегда настроен с параметром concurrency. Таким образом, другие сообщения будут просто ждать в очереди до тех пор, пока потребитель не завершит работу. Таким образом, это решение может работать хорошо, хотя оно выглядит неудобно, особенно для больших временных задержек.