У меня есть приложение, которое использует RabbitMQ в качестве очереди сообщений для отправки/получения сообщения между двумя компонентами: отправителем и получателем. Отправитель отправляет сообщение очень быстро. Приемник получает сообщение, а затем выполняет очень трудоемкую задачу (в основном, запись в базу данных для очень большого размера данных). Поскольку приемник занимает очень много времени, чтобы завершить задачу, а затем получить следующее сообщение в очереди, отправитель будет быстро заполнять очередь. Поэтому мой вопрос: приведет ли это к переполнению очереди сообщений?
Потребитель сообщений выглядит следующим образом:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
Каждое сообщение, полученное потребителем, содержит идентификатор caseID. Для каждого caseID он сохранит большой объем данных в базе данных, что занимает очень много времени. В настоящее время для RabbitMQ настроен только один потребитель, поскольку производитель/потребитель использует одну и ту же очередь для публикации/подписания caseID. Итак, как я могу ускорить пропускную способность потребителя, чтобы потребитель мог догнать производителя и избежать переполнения сообщения в очереди? Должен ли я использовать многопоточность в потребительской части для ускорения потребления? Или я должен использовать несколько потребителей для одновременного потребления входящего сообщения? Или есть асинхронный способ, чтобы потребитель потреблял сообщение асинхронно, не дожидаясь его завершения? Любые предложения приветствуются.