Подтвердить что ты не робот

RabbitMQ: быстрый производитель и медленный потребитель

У меня есть приложение, которое использует RabbitMQ в качестве очереди сообщений для отправки/получения сообщения между двумя компонентами: отправителем и получателем. Отправитель отправляет сообщение очень быстро. Приемник получает сообщение, а затем выполняет очень трудоемкую задачу (в основном, запись в базу данных для очень большого размера данных). Поскольку приемник занимает очень много времени, чтобы завершить задачу, а затем получить следующее сообщение в очереди, отправитель будет быстро заполнять очередь. Поэтому мой вопрос: приведет ли это к переполнению очереди сообщений?

Потребитель сообщений выглядит следующим образом:

public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}

Каждое сообщение, полученное потребителем, содержит идентификатор caseID. Для каждого caseID он сохранит большой объем данных в базе данных, что занимает очень много времени. В настоящее время для RabbitMQ настроен только один потребитель, поскольку производитель/потребитель использует одну и ту же очередь для публикации/подписания caseID. Итак, как я могу ускорить пропускную способность потребителя, чтобы потребитель мог догнать производителя и избежать переполнения сообщения в очереди? Должен ли я использовать многопоточность в потребительской части для ускорения потребления? Или я должен использовать несколько потребителей для одновременного потребления входящего сообщения? Или есть асинхронный способ, чтобы потребитель потреблял сообщение асинхронно, не дожидаясь его завершения? Любые предложения приветствуются.

4b9b3361

Ответ 1

"Это приведет к переполнению очереди сообщений?"

Да. RabbitMQ войдет в состояние "управления потоком", чтобы предотвратить чрезмерное потребление памяти при увеличении длины очереди. Он также начнет переносить сообщения на диск, а не удерживать их в памяти.

"Итак, как я могу ускорить пропускную способность потребителя, чтобы потребитель может догнать производителя и избежать переполнения сообщения в очередь"

У вас есть 2 варианта:

  • Добавить потребителей. Имейте в виду, что теперь ваша БД будет управляться несколькими параллельными процессами, если вы выберете эту опцию. Убедитесь, что БД может выдерживать дополнительное давление.
  • Увеличьте значение QOS для канала потребления. Это приведет к увеличению количества сообщений из очереди и буферизации их на потребителя. Это увеличит общее время обработки; если 5 сообщений буферизованы, пятое сообщение займет время обработки сообщений 1... 5.

"Должен ли я использовать многопоточность в потребительской части для ускорения расход?"

Нет, если у вас нет хорошо разработанного решения. Добавление parallelism в приложение добавит много накладных расходов на стороне потребителя. Вы можете исчерпать использование ThreadPool или дросселирования памяти.

При работе с AMQP вам действительно нужно учитывать бизнес-требования для каждого процесса, чтобы разработать оптимальное решение. Как ваши входящие сообщения зависят от времени? Нужно ли им постоянно сохраняться в БД как можно скорее, или это имеет значение для ваших пользователей независимо от того, доступны ли эти данные немедленно?

Если данные не обязательно должны быть сохранены немедленно, вы можете изменить свое приложение, чтобы потребитель (пользователи) просто удалял сообщения из очереди и сохранял их в кэшированной коллекции, например, в Redis. Представьте второй процесс, который затем будет последовательно читать и обрабатывать кэшированные сообщения. Это гарантирует, что длина очереди недостаточно вырастает, чтобы обеспечить управление потоком, в то же время предотвращая бомбардировку вашей БД запросами на запись, которые обычно более дороги, чем запросы на чтение. Теперь ваш потребитель (пользователи) просто удаляет сообщения из очереди, а затем обрабатывается другим процессом.

Ответ 2

"Итак, как я могу ускорить пропускную способность потребителя, чтобы потребитель мог догнать производителя и избежать переполнения сообщения в очереди?" Это ответ "использовать несколько потребителей для одновременного использования входящего сообщения", использовать многопоточность для параллельной работы этих потребителей, реализующих принцип, разделяющий ничего, http://www.eaipatterns.com/CompetingConsumers.html

Ответ 3

У вас есть много способов повысить производительность.

  • Вы можете создать рабочую очередь с большим количеством производителей, таким образом вы создадите простую систему балансировки нагрузки. не используйте exchange --- > queue, а только очередь. Прочтите этот пост RabbitMQ Non-Round Robin Dispatching

  • Когда вы получаете сообщение, вы можете создать файл пула для вставки данных в свою базу данных, но в этом случае вам придется управлять сбоем.

Но я думаю, что основной проблемой является база данных, а не RabbitMQ. Благодаря хорошей настройке, многопоточности и рабочей очереди вы можете иметь масштабируемое и быстрое решение.

Сообщите мне

Ответ 4

Хотя верно, что добавление большего количества потребителей может ускорить процесс, реальная проблема будет сохраняться в базе данных.

Здесь уже много ответов, в которых говорится о добавлении потребителей (потоков и/или машин) и изменении QoS, поэтому я не буду повторять это. Вместо этого вам следует серьезно подумать об использовании шаблона Aggregator для агрегирования сообщений в группу сообщений, а затем пакетная вставка группы в вашу базу данных одним выстрел.

Ваш текущий код для каждого сообщения, вероятно, открывает соединение, вставляет данные и закрывает это соединение (или возвращается в пул). Хуже того, он может даже использовать транзакции.

Используя шаблон агрегатора, вы, по существу, буферизируете данные перед тем, как сбросить их.

Теперь писать хороший агрегатор сложно. Вам нужно будет решить, как вы хотите буферировать (т.е. Каждый рабочий имеет свой собственный буфер или центральный буфер, такой как Redis). Spring Интеграция имеет агрегатор, я считаю.

Ответ 5

В качестве ответа я предлагаю: оба.

Вы можете воспользоваться преимуществами наличия нескольких приемников, а также настроить каждого приемника для выполнения задачи в отдельном потоке, что позволит приемнику принять следующее сообщение в очереди.

Конечно, этот подход предполагает, что результат каждой операции (запись на db, если я правильно понял) никоим образом не влияет на результат последующих операций в ответ от других сообщений.