Подтвердить что ты не робот

Как я могу обрабатывать несколько сообщений одновременно из темы JMS (без очереди) с помощью java и spring 3.0?

Обратите внимание, что я хотел бы, чтобы несколько прослушивателей сообщений обрабатывали последовательные сообщения из темы одновременно. Кроме того, я хотел бы, чтобы каждый прослушиватель сообщений работал транзакционно, чтобы сбой обработки в данном слушателе сообщения привел к тому, что сообщение слушателя оставалось в теме.

spring DefaultMessageListenerContainer поддерживает concurrency только для очередей JMS.

Мне нужно создать несколько экземпляров DefaultMessageListenerContainers?

Если время течет вниз по вертикальной оси:

ListenerA reads msg 1        ListenerB reads msg 2        ListenerC reads msg 3
ListenerA reads msg 4        ListenerB reads msg 5        ListenerC reads msg 6
ListenerA reads msg 7        ListenerB reads msg 8        ListenerC reads msg 9
ListenerA reads msg 10       ListenerB reads msg 11       ListenerC reads msg 12
...

UPDATE:
Спасибо за ваши отзывы @T.Rob и @skaffman.

В результате я создал несколько DefaultMessageListenerContainers с concurrency=1, а затем поместил логику в прослушиватель сообщений так, чтобы только один поток обрабатывал данный идентификатор сообщения.

4b9b3361

Ответ 1

Вам не нужны несколько экземпляров DefaultMessageListenerContainer, нет, но вам нужно настроить DefaultMessageListenerContainer на одновременное использование concurrentConsumers свойство:

Укажите количество одновременных потребителей создавать. Значение по умолчанию: 1.

Указание более высокого значения для этого установка увеличит стандарт уровень запланированного параллельного потребители во время выполнения: это эффективно минимальное количество которые будут запланированных в любой момент времени. Это статическая установка; для динамического масштабирования, рассмотрите вопрос о Настройка "maxConcurrentConsumers" вместо этого.

Увеличение количества одновременных потребителям рекомендуется использовать масштабировать потребление сообщений входя из очереди. Однако обратите внимание что любые гарантии заказа теряются когда несколько потребителей зарегистрировано. В общем, придерживайтесь 1 для небольших томов.

Однако в нижней части экрана есть большое предупреждение:

Не увеличивайте количество одновременных потребителей для темы. Это приведет к одновременному потребление одного и того же сообщения, которое вряд ли когда-либо желательно.

Это интересно, и имеет смысл, когда вы об этом думаете. То же самое произошло бы, если бы у вас было несколько экземпляров DefaultMessageListenerContainer.

Я думаю, вам, возможно, нужно переосмыслить свой дизайн, хотя я не уверен, что я предлагаю. Одновременное потребление сообщений pub/sub кажется вполне разумным, но как избежать того, чтобы одно и то же сообщение доставлялось всем вашим потребителям одновременно?

Ответ 2

По крайней мере, в ActiveMQ все, что вам нужно, полностью поддерживается, его имя VirtualTopic

Концепция такова:

  • Вы создаете VirtualTopic (просто создайте тему, используя префикс VirtualTopic.), например. VirtualTopic.Color
  • Создайте пользователя, подписавшегося на этот VirtualTopic, соответствующий этому шаблону Consumer.<clientName>.VirtualTopic.<topicName> например. Consumer.client1.VirtualTopic.Color, сделав это, Activemq создаст очередь с этим именем и эта очередь будет подписана на VirtualTopic.Color, тогда каждое сообщение, опубликованное в этой виртуальной теме, будет доставлено в client1, обратите внимание, что он работает как обмены rabbitmq.
  • Вы закончили, теперь вы можете потреблять client1 очередь, как и каждая очередь, со многими потребителями, DLQ, настраиваемая политика переопределения и т.д.
  • В этот момент, я думаю, вы поняли, что можете создать client2, client3 и сколько подписчиков, которые вы хотите, все они получат копия сообщения, опубликованного в VirtualTopic.Color

Здесь код

@Component
public class ColorReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    // simply generating data to the topic
    long id=0;
    @Scheduled(fixedDelay = 500)
    public void postMail() throws JMSException, IOException {

        final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)];
        final Color color = new Color(++id, colorName.getName());
        final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
        message.setObject(color);
        message.setProperty("color", color.getName());
        LOGGER.info("status=color-post, color={}", color);
        jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message);
    }

    /**
     * Listen all colors messages
     */
    @JmsListener(
        destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer"
        selector = "color <> 'RED'"
    )
    public void genericReceiveMessage(Color color) throws InterruptedException {
        LOGGER.info("status=GEN-color-receiver, color={}", color);
    }

    /**
     * Listen only red colors messages
     *
     * the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that
     * the containers clientId need to be different between each other
     */
    @JmsListener(
//      destination = "Consumer.redColorContainer.VirtualTopic.color",
        destination = "Consumer.client1.VirtualTopic.color",
        containerFactory = "redColorContainer", selector = "color='RED'"
    )
    public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException {
        LOGGER.info("status=RED-color-receiver, color={}", message.getObject());
    }

    /**
     * Listen all colors messages
     */
    @JmsListener(
        destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer"
    )
    public void genericReceiveMessage2(Color color) throws InterruptedException {
        LOGGER.info("status=GEN-color-receiver-2, color={}", color);
    }

}

@SpringBootApplication
@EnableJms
@EnableScheduling
@Configuration
public class Config {

    /**
     * Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different
     * clientIds per consumer pool (as two @JmsListener above, or two application instances)
     * 
     */
    @Bean
    public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory, 
        DefaultJmsListenerContainerFactoryConfigurer configurer) {

        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-5");
        configurer.configure(factory, connectionFactory);
        // container.setClientId("aId..."); lets spring generate a random ID
        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {

        // necessary when post serializable objects (you can set it at application.properties)
        connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName()));

        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-2");
        configurer.configure(factory, connectionFactory);
        return factory;
    }

}

public class Color implements Serializable {

    public static final Color WHITE = new Color("WHITE");
    public static final Color BLUE = new Color("BLUE");
    public static final Color RED = new Color("RED");

    private String name;
    private long id;

    // CONSTRUCTORS, GETTERS AND SETTERS
}

Ответ 3

Здесь есть возможность:

1) создайте только один DMLC, настроенный с помощью bean и метод обработки входящего сообщения. Установите для параметра concurrency значение 1.

2) Настройте исполнитель задачи С#threads, равным concurrency, который вы желаете. Создайте пул объектов для объектов, которые на самом деле должны обрабатывать сообщение. Дайте ссылку на исполнитель задачи и пул объектов на bean, настроенный в # 1. Пул объектов полезен, если фактическая обработка сообщений bean не является потокобезопасной.

3) Для входящего сообщения bean в DMLC создает пользовательский Runnable, указывает его на сообщение и пул объектов и передает его исполнителю задач.

4) Метод запуска Runnable получает bean из пула объектов и вызывает его метод "process" с указанным сообщением.

# 4 можно управлять с помощью прокси и пула объектов, чтобы упростить его.

Я еще не пробовал это решение, но он, похоже, соответствует законопроекту. Обратите внимание, что это решение не так надежно, как EJB MDB. Spring например. не отбрасывает объект из пула, если он генерирует исключение RuntimeException.

Ответ 4

Это один из тех случаев, когда различия в транспортных провайдерах выходят за рамки абстракции JMS. JMS хочет предоставить копию сообщения для каждого абонента по теме. Но поведение, которое вы хотите, действительно является положением в очереди. Я подозреваю, что есть другие требования, связанные с этим, с решением pub/sub, которое не описывалось - например, другие вещи должны подписываться на одну и ту же тему независимо от вашего приложения.

Если бы я должен был сделать это в WebSphere MQ, решение было бы создать административную подписку, которая привела бы к тому, что отдельная копия каждого сообщения по данному вопросу была помещена в очередь. Тогда ваши многочисленные подписчики могут конкурировать за сообщения в этой очереди. Таким образом, ваше приложение может иметь несколько потоков, среди которых распространяются сообщения, и в то же время другие подписчики, независимые от этого приложения, могут динамически (un) подписываться на одну и ту же тему.

К сожалению, нет никакого общего JMS-портативного способа сделать это. В значительной степени вы зависите от реализации поставщика транспорта. Единственное, с чем я могу поговорить, это WebSphere MQ, но я уверен, что другие транспорты поддерживают это так или иначе и в разной степени, если вы созидательны.

Ответ 6

Я столкнулся с той же проблемой. В настоящее время я изучаю RabbitMQ, который, кажется, предлагает идеальное решение в шаблоне проектирования, который они называют "рабочими очередями". Подробнее здесь: http://www.rabbitmq.com/tutorials/tutorial-two-java.html

Если вы не полностью привязаны к JMS, вы можете изучить это. Также может быть JMS для AMQP-моста, но это может показаться взломанным.

Я получаю удовольствие (читаю: трудности), получая RabbitMQ, установленный и запущенный на моем Mac, но думаю, что я близок к тому, что он работает, я отправлю сообщение, если смогу это решить.

Ответ 7

Создание собственного исполнителя задач, казалось бы, решило проблему для меня, без дублирования обработки:

@Configuration
class BeanConfig {
    @Bean(destroyMethod = "shutdown")
    public ThreadPoolTaskExecutor topicExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setAllowCoreThreadTimeOut(true);
        executor.setKeepAliveSeconds(300);
        executor.setCorePoolSize(4);
        executor.setQueueCapacity(0);
        executor.setThreadNamePrefix("TOPIC-");
        return executor;
    }

    @Bean
    JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer, @Qualifier("topicExecutor") Executor topicExecutor) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(true);
        factory.setSessionTransacted(false);
        factory.setSubscriptionDurable(false);
        factory.setTaskExecutor(topicExecutor);
        return factory;
    }

}

class MyBean {
    @JmsListener(destination = "MYTOPIC", containerFactory = "topicListenerFactory", concurrency = "1")
    public void receiveTopicMessage(SomeTopicMessage message) {}
}

Ответ 8

на конфигурациях server.xml:

           Таким образом, в maxSessions вы можете определить количество сеансов, которые вы хотите.

Ответ 9

Перешел через этот вопрос. Моя конфигурация:

Создайте bean с помощью id="DefaultListenerContainer", добавьте свойство name="concurrentConsumers" value="10" и свойство name="maxConcurrentConsumers" value ="50".

Прекрасно работает. Я напечатал идентификатор потока и проверил, что несколько потоков создаются и повторно используются.