Подтвердить что ты не робот

JMS - переход от одного к нескольким потребителям

У меня есть JMS-клиент, который создает сообщения и отправляет очередь JMS своему уникальному пользователю.

То, что я хочу, - это больше, чем один потребитель, получающий эти сообщения. Первое, что приходит мне на ум, - превратить очередь в тему, так что нынешние и новые пользователи могут подписаться и получить одно и то же сообщение, доставленное всем из них.

Это, очевидно, будет связано с модификацией текущего кода клиентов как с точки зрения производителя, так и со стороны потребителей.

Я также хотел бы посмотреть другие варианты, например, создать вторую очередь, чтобы мне не пришлось изменять существующего пользователя. Я считаю, что в этом подходе есть преимущества, такие как (исправьте меня, если я ошибаюсь), балансируя нагрузку между двумя разными очередями, а не одной, что может оказать положительное влияние на производительность.

Я хотел бы получить информацию об этих вариантах и ​​минусах/плюсах, которые вы можете увидеть. Любая обратная связь высоко ценится.

4b9b3361

Ответ 1

У вас есть несколько вариантов, как вы заявили.

Если вы преобразуете его в тему, чтобы получить тот же эффект, вам нужно будет сделать потребителей постоянными потребителями. Одна вещь, предлагаемая в очереди, - это настойчивость, если ваш потребитель не жив. Это будет зависеть от используемой системы MQ.

Если вы хотите придерживаться очередей, вы создадите очередь для каждого потребителя и диспетчера, который будет прослушивать исходную очередь.

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

Плюсы тем

  • Легче динамически добавлять новых потребителей. Все потребители получат новые сообщения без какой-либо работы.
  • Вы можете создавать круглые темы, чтобы Consumer_1 получал сообщение, затем Consumer_2, затем Consumer_3
  • Потребителям могут быть добавлены новые сообщения, вместо того чтобы запрашивать очередь, делая их реактивными.

Против тем

  • Сообщения не сохраняются, если ваш брокер не поддерживает эту конфигурацию. Если потребитель уходит в отставку и возвращается, можно пропустить пропущенные сообщения, если не настроены постоянные потребители.
  • Сложно разрешить Consumer_1 и Consumer_2 получать сообщение, но не Consumer_3. С диспетчером и очередями диспетчер не может поместить сообщение в очередь Consumer_3.

Плюсы очередей

  • Сообщения сохраняются, пока пользователь не удалит их.
  • Диспетчер может фильтровать, какие потребители получают, какие сообщения не помещают сообщения в соответствующие очереди потребителей. Это можно сделать с помощью тем через фильтры.

Недостатки очереди

  • Необходимо создать дополнительные очереди для поддержки нескольких потребителей. В динамической среде это было бы неэффективно.

При разработке системы обмена сообщениями я предпочитаю темы, так как это дает мне наибольшую власть, но, увидев, что вы уже используете очереди, вам потребуется изменить способ работы вашей системы для реализации тем.

Разработка и внедрение системы очередей с несколькими потребителями

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

Источник

Имейте в виду, что вам необходимо позаботиться о таких проблемах, как обработка исключений проблем, повторное подключение к соединению и очереди, если вы потеряете соединение и т.д. Это просто предназначено, чтобы дать вам представление о том, как выполните то, что я описал.

В реальной системе я, вероятно, не выйду из первого исключения. Я бы позволил системе продолжать работать, насколько это было возможно, и регистрировать ошибки. Поскольку этот код стоит в том случае, если сообщение в одной очереди потребителей завершается неудачно, весь диспетчер остановится.

Dispatcher.java

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package stackoverflow_4615895;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

public class Dispatcher {

    private static long QUEUE_WAIT_TIME = 1000;
    private boolean mStop = false;
    private QueueConnectionFactory mFactory;
    private String mSourceQueueName;
    private String[] mConsumerQueueNames;

    /**
     * Create a dispatcher
     * @param factory
     *      The QueueConnectionFactory in which new connections, session, and consumers
     *      will be created. This is needed to ensure the connection is associated
     *      with the correct thread.
     * @param source
     *
     * @param consumerQueues
     */
    public Dispatcher(
        QueueConnectionFactory factory, 
        String sourceQueue, 
        String[] consumerQueues) {

        mFactory = factory;
        mSourceQueueName = sourceQueue;
        mConsumerQueueNames = consumerQueues;
    }

    public void start() {
        Thread thread = new Thread(new Runnable() {

            public void run() {
                Dispatcher.this.run();
            }
        });
        thread.setName("Queue Dispatcher");
        thread.start();
    }

    public void stop() {
        mStop = true;
    }

    private void run() {

        QueueConnection connection = null;
        MessageProducer producer = null;
        MessageConsumer consumer = null;
        QueueSession session = null;
        try {
            // Setup connection and queues for receiving the messages
            connection = mFactory.createQueueConnection();
            session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            Queue sourceQueue = session.createQueue(mSourceQueueName);
            consumer = session.createConsumer(sourceQueue);

            // Create a null producer allowing us to send messages
            // to any queue.
            producer = session.createProducer(null);

            // Create the destination queues based on the consumer names we
            // were given.
            Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
            for (int index = 0; index < mConsumerQueueNames.length; ++index) {
                destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
            }

            connection.start();

            while (!mStop) {

                // Only wait QUEUE_WAIT_TIME in order to give
                // the dispatcher a chance to see if it should
                // quit
                Message m = consumer.receive(QUEUE_WAIT_TIME);
                if (m == null) {
                    continue;
                }

                // Take the message we received and put
                // it in each of the consumers destination
                // queues for them to process
                for (Queue q : destinationQueues) {
                    producer.send(q, m);
                }
            }

        } catch (JMSException ex) {
            // Do wonderful things here 
        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException ex) {
                }
            }
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException ex) {
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException ex) {
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                }
            }
        }
    }
}

Main.java

    QueueConnectionFactory factory = ...;

    Dispatcher dispatcher =
            new Dispatcher(
            factory,
            "Queue_Original",
            new String[]{
                "Consumer_Queue_1",
                "Consumer_Queue_2",
                "Consumer_Queue_3"});
    dispatcher.start();

Ответ 2

Возможно, вам не придется изменять код; это зависит от того, как вы его написали.

Например, если ваш код отправляет сообщения с помощью MessageProducer, а не QueueSender, то он будет работать как для тем, так и для очередей. Аналогично, если вы использовали MessageConsumer, а не QueueReceiver.

По сути, в JMS-приложениях хорошо использовать неспецифические интерфейсы для взаимодействия с системой JMS, например MessageProducer, MessageConsumer, Destination и т.д. Если это случай, это "простое" "вопрос конфигурации.