Подтвердить что ты не робот

RabbitMQ по примеру: несколько потоков, каналов и очередей

Я просто прочитал RabbitMQ Java API docs и нашел его очень информативным и прямым. Пример того, как настроить простой Channel для публикации/потребления, очень легко понять и понять. Но это очень простой/базовый пример, и это оставило мне важный вопрос: Как настроить 1+ Channels для публикации/потребления в нескольких очередях и из них?

Скажем, у меня есть сервер RabbitMQ с тремя очередями на нем: logging, security_events и customer_orders. Таким образом, нам понадобится один Channel, чтобы иметь возможность публиковать/потреблять все три очереди или, более вероятно, иметь 3 отдельных Channels, каждый из которых посвящен одной очереди.

Кроме того, лучшие практики RabbitMQ диктуют, что мы настроили 1 Channel на потребительский поток. В этом примере предположим, что security_events отлично работает только с одним потребительским потоком, но logging и customer_order оба требуют 5 потоков для обработки тома. Итак, если я правильно понимаю, значит ли это, что нам нужно:

  • 1 Channel и 1 потребительская нить для публикации/потребления в и из security_events; и
  • 5 Channels и 5 потребительских потоков для публикации/потребления в и из logging; и
  • 5 Channels и 5 потребительских потоков для публикации/потребления в и из customer_orders?

Если мое понимание здесь ошибочно, начните с исправления. В любом случае, может ли какой-то изнурительный боец ​​RabbitMQ ветеран помочь мне "соединить точки" с достойным примером кода для создания издателей/потребителей, которые отвечают моим требованиям здесь? Спасибо заранее!

4b9b3361

Ответ 1

Я думаю, что у вас есть несколько проблем с начальным пониманием. Честно говоря, я немного удивлен, увидев следующее: both need 5 threads to handle the volume. Как вы определили, что вам нужен этот точный номер? У вас есть гарантии, что 5 потоков будет достаточно?

RabbitMQ настроен и проверен временем, так что это все о правильном дизайне и эффективная обработка сообщений.

Попробуйте проанализировать проблему и найти правильное решение. BTW, сама очередь сообщений не будет предоставлять никаких гарантий, что у вас действительно хорошее решение. Вам нужно понять, что вы делаете, а также провести дополнительное тестирование.

Как вы определенно знаете, есть много вариантов:

enter image description here

Я использую макет B как самый простой способ проиллюстрировать проблему 1 производителя N потребителей. Так как вы так обеспокоены пропускной способностью. Кстати, как вы могли бы ожидать, RabbitMQ ведет себя неплохо (источник). Обратите внимание на prefetchCount, я расскажу об этом позже:

enter image description here

Таким образом, логика обработки сообщений, вероятно, является правильным местом для обеспечения достаточной пропускной способности. Естественно, вы можете охватить новый поток каждый раз, когда вам нужно обработать сообщение, но в конечном итоге такой подход убьет вашу систему. В основном, больше потоков у вас больше латентности вы получите (вы можете проверить закон Амдаля, если хотите).

enter image description here

(см. Закон Амдаля, иллюстрированный)

Совет # 1: Будьте осторожны с потоками, используйте ThreadPools (подробнее)

Пул потоков можно описать как набор объектов Runnable (рабочая очередь) и соединения запущенных потоков. Эти потоки постоянно работают и проверяют рабочий запрос на новую работу. Если есть новая работа, которую нужно выполнить, они выполняют этот Runnable. Нить сам класс предоставляет способ, например. выполнить (Runnable r), чтобы добавить новый Runnable object to work queue.

public class Main {
  private static final int NTHREDS = 10;

  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
    for (int i = 0; i < 500; i++) {
      Runnable worker = new MyRunnable(10000000L + i);
      executor.execute(worker);
    }
    // This will make the executor accept no new threads
    // and finish all existing threads in the queue
    executor.shutdown();
    // Wait until all threads are finish
    executor.awaitTermination();
    System.out.println("Finished all threads");
  }
} 

Совет № 2: Будьте осторожны с накладными сообщениями

Я бы сказал, что это очевидная техника оптимизации. Скорее всего, вы будете отправлять небольшие и простые в обработке сообщения. Весь подход заключен в том, что небольшие сообщения должны постоянно устанавливаться и обрабатываться. Большие сообщения в конце концов будут играть плохую шутку, поэтому лучше избегать этого.

enter image description here

Так что лучше отправить крошечные фрагменты информации, но как насчет обработки? Каждый раз, когда вы отправляете задание, накладные расходы накладные. Пакетная обработка может быть очень полезна в случае высокой скорости входящего сообщения.

enter image description here

Например, допустим, что у нас есть простая логика обработки сообщений, и мы не хотим иметь потоки с конкретными потоками каждый раз, когда сообщение обрабатывается. Чтобы оптимизировать этот очень простой CompositeRunnable can be introduced:

class CompositeRunnable implements Runnable {

    protected Queue<Runnable> queue = new LinkedList<>();

    public void add(Runnable a) {
        queue.add(a);
    }

    @Override
    public void run() {
        for(Runnable r: queue) {
            r.run();
        }
    }
}

Или сделайте то же самое несколько иначе, собрав сообщения для обработки:

class CompositeMessageWorker<T> implements Runnable {

    protected Queue<T> queue = new LinkedList<>();

    public void add(T message) {
        queue.add(message);
    }

    @Override
    public void run() {
        for(T message: queue) {
            // process a message
        }
    }
}

Таким образом, вы можете обрабатывать сообщения более эффективно.

Совет № 3: оптимизация обработки сообщений

Несмотря на то, что вы знаете, что можно обрабатывать сообщения параллельно (Tip #1) и уменьшать накладные расходы на обработку (Tip #2), вам нужно делать все быстро. Избыточные этапы обработки, тяжелые циклы и т.д. Могут сильно повлиять на производительность. См. Интересные тематические исследования:

enter image description here

Улучшение пропускной способности очереди сообщений десятикратным путем выбора правильного XML-анализатора

Совет № 4: Управление соединениями и каналами

  • Запуск нового канала в существующем соединении включает в себя одну сеть round trip - запуск нового соединения занимает несколько.
  • Каждое соединение использует файловый дескриптор на сервере. Каналов нет.
  • Публикация большого сообщения на одном канале блокирует соединение в то время как он гаснет. Кроме того, мультиплексирование довольно прозрачно.
  • Связи, которые публикуются, могут блокироваться, если сервер перегружен - это хорошая идея отделить публикацию и потребление соединения
  • Будьте готовы к обработке пакетов сообщений

(источник)

Обратите внимание, что все советы прекрасно работают вместе. Не стесняйтесь, дайте мне знать, если вам нужны дополнительные данные.

Полный пример потребителя (источник)

Обратите внимание на следующее:

  • channel.basicQos(упреждающий)- Как вы видели ранее prefetchCount, может оказаться очень полезным:

    Эта команда позволяет потребителю выбрать окно предварительной выборки, которое определяет количество неподтвержденных сообщений, которые он готов Получать. Установив счетчик предварительной выборки на ненулевое значение, брокер не будет доставлять потребителю никаких сообщений, которые предел. Чтобы переместить окно вперед, потребитель должен признать получение сообщения (или группы сообщений).

  • ExecutorService threadExecutor - вы можете указать правильно настроенную службу-исполнитель.

Пример:

static class Worker extends DefaultConsumer {

    String name;
    Channel channel;
    String queue;
    int processed;
    ExecutorService executorService;

    public Worker(int prefetch, ExecutorService threadExecutor,
                  , Channel c, String q) throws Exception {
        super(c);
        channel = c;
        queue = q;
        channel.basicQos(prefetch);
        channel.basicConsume(queue, false, this);
        executorService = threadExecutor;
    }

    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        Runnable task = new VariableLengthTask(this,
                                               envelope.getDeliveryTag(),
                                               channel);
        executorService.submit(task);
    }
}

Вы также можете проверить следующее:

Ответ 2

Как настроить 1+ каналы для публикации/потребления в нескольких очередях и из них?

Вы можете реализовать потоки и каналы. Все, что вам нужно, это способ классифицировать вещи, то есть все элементы очереди из входа, все элементы очереди из security_events и т.д. Катагоризация может быть с помощью routingKey.

т.е.: каждый раз, когда вы добавляете элемент в очередь и указываете маршрутизацию ключ. Он будет добавлен как элемент свойства. Этим вы можете получить значения из определенного события говорят о регистрации.

В следующем примере кода объясняется, как сделать это на стороне клиента.

Например:

Используется ключ маршрутизации, который идентифицирует тип канала и извлекает типы.

Например, если вам нужно получить все каналы о типе Login то вы должны указать ключ маршрутизации как логин или другое ключевое слово чтобы идентифицировать это.

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            string routingKey="login";

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

Вы можете посмотреть здесь для более подробной информации о категоризации.


Часть темы

Как только часть публикации закончится, вы можете запустить часть потока.

В этой части вы можете получить опубликованные данные на основе категории. то есть; маршрутизирующий ключ, который в вашем случае регистрирует, security_events и customer_orders и т.д.

посмотрите в примере, чтобы узнать, как извлекать данные в потоках.

Например:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
//**The threads part is as follows** 
 channel.exchangeDeclare(EXCHANGE_NAME, "direct");      
 String queueName = channel.queueDeclare().getQueue();
    // This part will biend the queue with the severity (login for eg:)
    for(String severity : argv){
              channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    }
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "myConsumerTag",
    new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties properties,
                                byte[] body)
         throws IOException
     {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();

             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
     }
 });

Теперь поток, обрабатывающий данные в очереди тип входа (ключ маршрутизации). Таким образом, вы можете создавать несколько потоков. Каждая услуга различной цели.

посмотрите здесь для более подробной информации о части потоков.

Ответ 3

Почему реализовать все самостоятельно?

Попробуйте использовать какую-то инфраструктуру интеграции. Допустим, Camel уже имеет связку коннекторов для разных систем, Rabbit MQ включен Camel Rabbit MQ.

Вы должны просто определить свои маршруты. Например:

Вы хотите потреблять сообщения из очереди ведения журнала 5 одновременными потребителями в файл.

from("rabbitmq://localhost/Logging ?concurrentConsumers=5")
.to("file://yourLoggingFile")

Существует много параметров, как установить пользователя файла. Как вы можете видеть, вы можете определить, сколько потребителей должно появиться, просто вставив concurrentConsumers=5 в ваш URI. Если вы хотите, чтобы вы могли создать своего собственного производителя или потребителя, реализовав интерфейс процессора.

Это очень универсальная и мощная среда, в которой вы можете много работать, используя предоставленные компоненты. Веб-страница проекта содержит множество примеров и документации.