Подтвердить что ты не робот

Является ли хорошей практикой использовать временную очередь JMS для синхронного использования?

Если мы используем механизм запроса/ответа JMS с помощью "Временной очереди", будет ли этот код масштабироваться?

На данный момент мы не знаем, будем ли мы поддерживать 100 запросов в секунду или 1000 запросов в секунду.

Ниже приведен код, который я собираюсь реализовать. Он использует JMS в синхронном режиме. Ключевыми частями являются то, где создается "Потребитель", чтобы указать "Временную очередь", созданную для этого сеанса. Я просто не могу понять, является ли использование таких временных рядов масштабируемым дизайном.

  destination = session.createQueue("queue:///Q1");
  producer = session.createProducer(destination);
  tempDestination = session.createTemporaryQueue();
  consumer = session.createConsumer(tempDestination);

  long uniqueNumber = System.currentTimeMillis() % 1000;
  TextMessage message = session
      .createTextMessage("SimpleRequestor: Your lucky number today is " + uniqueNumber);

  // Set the JMSReplyTo
  message.setJMSReplyTo(tempDestination);

  // Start the connection
  connection.start();

  // And, send the request
  producer.send(message);
  System.out.println("Sent message:\n" + message);

  // Now, receive the reply
  Message receivedMessage = consumer.receive(15000); // in ms or 15 seconds
  System.out.println("\nReceived message:\n" + receivedMessage);

Update:

Я столкнулся с другим шаблоном, см. этот блог Идея состоит в том, чтобы использовать "обычные" очереди для отправки и получения. Однако для вызовов "Синхронный", чтобы получить желаемый ответ (т.е. Соответствующий запрос), вы создаете Потребителя, который слушает очередь приема с помощью "Селектора".

Шаги:

    // 1. Create Send and Receive Queue.
    // 2. Create a msg with a specific ID
 final String correlationId = UUID.randomUUID().toString();
 final TextMessage textMessage = session.createTextMessage( msg );
 textMessage.setJMSCorrelationID( correlationId );

    // 3. Start a consumer that receives using a 'Selector'.
           consumer = session.createConsumer( replyQueue, "JMSCorrelationID = '" + correlationId + "'" );

Таким образом, разница в этом шаблоне заключается в том, что мы не создаем новую временную очередь для каждого нового запроса. Вместо этого все ответы поступают только в одну очередь, но используют "селектор", чтобы убедиться, что каждый запрос-поток получает единственный ответ, который волнует.

Я считаю, что недостатком здесь является то, что вам нужно использовать "селектор". Я пока не знаю, является ли это менее предпочтительным или более предпочтительным, чем ранее упомянутый образец. Мысли?

4b9b3361

Ответ 2

Интересно, что масштабируемость этого может фактически быть противоположной тому, что описаны в других ответах.

WebSphere MQ сохраняет и повторно использует объекты динамической очереди, где это возможно. Таким образом, хотя использование динамической очереди не является бесплатным, она действительно хорошо масштабируется, потому что, когда очереди освобождаются, все, что должен сделать WMQ, это передать дескриптор в следующий поток, который запрашивает новый экземпляр очереди. В занятом QMgr количество динамических очередей останется относительно статическим, в то время как обработчики передаются из потока в поток. Строго говоря, это не так быстро, как повторное использование одной очереди, но это неплохо.

С другой стороны, хотя индексирование на CORRELID выполняется быстро, производительность инвертируется по количеству сообщений в индексе. Это также имеет значение, если глубина очереди начинает строиться. Когда приложение отправляется GET с WAIT в пустой очереди, нет задержки. Но в глубокой очереди QMgr должен искать индекс существующих сообщений, чтобы определить, что ответное сообщение не входит в их число. В вашем примере разница между поиском пустого индекса и большим индексом 1000 раз в секунду.

В результате 1000 динамических очередей с одним сообщением могут быть быстрее, чем одна очередь с 1000 потоками, получающими CORRELID, в зависимости от характеристик приложения и нагрузки. Я бы рекомендовал проверить это на шкале перед тем, как совершить конкретный дизайн.

Ответ 3

Использование селектора по идентификатору корреляции в общей очереди будет очень хорошо масштабироваться с несколькими потребителями.

1000 запросов/с, однако, будут много. Возможно, вы захотите разделить нагрузку между различными экземплярами, если производительность окажется проблемой.

Возможно, вы захотите подробно рассказать о количестве запросов и номеров клиентов. Если количество клиентов равно < 10 и будет оставаться довольно статичным, а номера запросов очень высоки, наиболее устойчивым и быстрым решением может быть статическая очередь ответов для каждого клиента.

Ответ 4

Создание временных очередей не является бесплатным. Ведь он выделяет ресурсы на брокер (ы). Сказав это, если у вас есть неизвестное (перед рукой) потенциально несвязанное количество клиентов (несколько JVM, несколько параллельных потоков на JVM и т.д.), У вас может не быть выбора. Перераспределение клиентских очередей и их назначение клиентам быстро выходят из строя.

Конечно, то, что вы набросали, - это самое простое решение. И если вы можете получить реальные цифры для объема транзакции, и он масштабируется достаточно, отлично.

Прежде чем я попытаюсь избежать временных очередей, я бы больше хотел ограничить количество клиентов и сделать их долгожителями. То есть создайте пул клиентов на стороне клиента и попросите клиентов в пуле создать временную очередь, сеанс, соединение и т.д. При запуске, повторно использовать их при последующих запросах и оторвать их при завершении работы. Тогда проблема настройки становится одним из максимальных/минимальных размеров в пуле, что означает время простоя для объединения пула и поведение (сбой и блокировка) при максимальном пуле. Если вы не создаете произвольно большое количество переходных JVM (в этом случае у вас больше проблем с масштабированием только из-за начальных затрат JVM), это должно масштабироваться так же хорошо, как и все. В конце концов, в этот момент ресурсы, которые вы выделяете, отражают фактическое использование системы. На самом деле нет возможности использовать меньше.

Вещь, которой следует избегать, заключается в создании и уничтожении большого бесплатного количества очередей, сеансов, подключений и т.д. Создайте серверную часть, чтобы разрешить потоковое воспроизведение. Затем пул, если/когда вам нужно. Как и нет, для чего-то нетривиального, вам нужно будет.

Ответ 5

Использование временной очереди будет стоить каждый раз при создании relyToProducers. Вместо использования кэшированных производителей для статического answerToQueue метод createProducer будет более дорогостоящим и удачным в высококонкурентной среде вызова.

Ответ 6

Я столкнулся с одной и той же проблемой и решил объединить соединения сам внутри безстоящего bean. Одно клиентское соединение имеет один tempQueue и лежит внутри объекта JMSMessageExchanger (который содержит connectionFactory, Queue и tempQueue), который привязан к одному экземпляру bean. Ive проверил его в средах JSE/EE. Но я не очень уверен в отношении поведения пула Glassfish JMS. Будет ли это фактически закрывать JMS-соединения, полученные "вручную" после завершения метода bean? Я делаю что-то ужасно неправильно?

Также Ive отключил транзакцию в клиенте bean (TransactionAttributeType.NOT_SUPPORTED) для немедленной отправки сообщений запроса в очередь запросов.

package net.sf.selibs.utils.amq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import lombok.Getter;
import lombok.Setter;
import net.sf.selibs.utils.misc.UHelper;

public class JMSMessageExchanger {

    @Setter
    @Getter
    protected long timeout = 60 * 1000;

    public JMSMessageExchanger(ConnectionFactory cf) {
        this.cf = cf;
    }

    public JMSMessageExchanger(ConnectionFactory cf, Queue queue) {
        this.cf = cf;
        this.queue = queue;
    }
    //work
    protected ConnectionFactory cf;
    protected Queue queue;
    protected TemporaryQueue tempQueue;
    protected Connection connection;
    protected Session session;
    protected MessageProducer producer;
    protected MessageConsumer consumer;
    //status
    protected boolean started = false;
    protected int mid = 0;

    public Message makeRequest(RequestProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
                this.tempQueue = this.session.createTemporaryQueue();
                this.consumer = this.session.createConsumer(tempQueue);
            }
            //send request
            Message requestM = producer.produce(this.session);
            mid++;
            requestM.setJMSCorrelationID(String.valueOf(mid));
            requestM.setJMSReplyTo(this.tempQueue);
            this.producer.send(this.queue, requestM);
            //get response
            while (true) {
                Message responseM = this.consumer.receive(this.timeout);
                if (responseM == null) {
                    return null;
                }
                int midResp = Integer.parseInt(responseM.getJMSCorrelationID());
                if (mid == midResp) {
                    return responseM;
                } else {
                    //just get other message
                }
            }

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    public void makeResponse(ResponseProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
            }
            Message response = producer.produce(this.session);
            response.setJMSCorrelationID(producer.getRequest().getJMSCorrelationID());
            this.producer.send(producer.getRequest().getJMSReplyTo(), response);

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    protected void init() throws Exception {
        this.connection = cf.createConnection();
        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        this.producer = this.session.createProducer(null);
        this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        this.connection.start();
        this.started = true;
    }

    public void close() {
        UHelper.close(producer);
        UHelper.close(consumer);
        UHelper.close(session);
        UHelper.close(connection);
        this.started = false;
    }

}

Тот же класс используется в клиенте (без состояния bean) и сервере (@MessageDriven). RequestProducer и ResponseProducer - это интерфейсы:

package net.sf.selibs.utils.amq;

import javax.jms.Message;
import javax.jms.Session;

public interface RequestProducer {
    Message produce(Session session) throws Exception;
}
package net.sf.selibs.utils.amq;

import javax.jms.Message;

public interface  ResponseProducer extends RequestProducer{
    void setRequest(Message request);
    Message getRequest();
}

Также я прочитал статью AMQ о реализации запроса-ответа по AMQ: http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html

Ответ 7

Возможно, я слишком поздно, но на этой неделе я провел несколько часов, чтобы синхронизировать запрос/ответ, работающий в JMS. Как насчет расширения QueueRequester с тайм-аутом. Я сделал и, по крайней мере, тестирование на одной машине (работающий брокер, запросчик и ответчик) показал, что это решение опережает обсуждаемые. С другой стороны, это зависит от использования QueueConnection, и это означает, что вы можете быть вынуждены открывать несколько подключений.