Подтвердить что ты не робот

Spring Загруженный встроенный кластер HornetQ не пересылает сообщения

Я пытаюсь создать статический кластер из двух приложений Spring для загрузки со встроенными серверами HornetQ. Одно приложение/сервер будет обрабатывать внешние события и генерировать сообщения для отправки в очередь сообщений. Другое приложение/сервер будет прослушивать очередь сообщений и обрабатывать входящие сообщения. Поскольку связь между двумя приложениями ненадежна, каждый будет использовать только локальные/inVM-клиенты для создания/потребления сообщений на своем соответствующем сервере и опираясь на функциональность кластеризации для пересылки сообщений в очередь на другом сервере в кластере.

Я использую HornetQConfigurationCustomizer для настройки встроенного сервера HornetQ, потому что по умолчанию он имеет только InVMConnectorFactory.

Я создал несколько примеров приложений, которые иллюстрируют эту настройку, в этом примере "ServerSend" относится к серверу, который будет создавать сообщения, а "ServerReceive" относится к серверу, который будет потреблять сообщения.

pom.xml для обоих приложений содержит:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
<dependency>
    <groupId>org.hornetq</groupId>
    <artifactId>hornetq-jms-server</artifactId>
</dependency>

DemoHornetqServerSendApplication:

@SpringBootApplication
@EnableScheduling
public class DemoHornetqServerSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Server: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverReceiveConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverSendConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}

application.properties(ServerSend):

spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password

DemoHornetqServerReceiveApplication:

@SpringBootApplication
@EnableJms
public class DemoHornetqServerReceiveApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerReceiveApplication.class, args);
    }

    @JmsListener(destination="${spring.hornetq.embedded.queues}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverSendConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverReceiveConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}

application.properties(ServerReceive):

spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password

После запуска обоих приложений вывод журнала показывает следующее:

ServerSend:

2015-04-09 11:11: 58.471 INFO 7536 --- [main] org.hornetq.core.server: HQ221000: сервер работает с конфигурацией Конфигурация HornetQ (clustered = true, backup = false, sharedStore = правда, journalDirectory = C:\Users ****\AppData\Local\Temp\hornetq-данные/журнал, bindingsDirectory = данные/привязок, largeMessagesDirectory = данные /largemessages, pagingDirectory = данные/пейджинга)
2015-04-09 11:11: 58.501 INFO 7536 --- [main] org.hornetq.core.server: HQ221045: libaio недоступен, переключение конфигурации в NIO
2015-04-09 11:11: 58.595 INFO 7536 --- [main] org.hornetq.core.server: HQ221043: добавление поддержки протокола CORE
2015-04-09 11:11: 58.720 INFO 7536 --- [main] org.hornetq.core.server: HQ221003: пытается развернуть очередь jms.queue.jms.testqueue
2015-04-09 11:11: 59.568 INFO 7536 --- [main] org.hornetq.core.server: HQ221020: запущен Netty Acceptor версии 4.0.13.Final localhost: 5445
2015-04-09 11:11: 59.593 INFO 7536 --- [main] org.hornetq.core.server: HQ221007: Сервер теперь в прямом эфире
2015-04-09 11:11: 59.593 INFO 7536 --- [main] org.hornetq.core.server: HQ221001: версия HornetQ Server 2.4.5.FINAL(Wild Hornet, 124) [c139929d-d90f-11e4-ba2e -e58abf5d6944]

ServerReceive:

2015-04-09 11:12: 04.401 INFO 4528 --- [main] org.hornetq.core.server: HQ221000: сервер работает с конфигурацией HornetQ Configuration (кластеризованный = true, backup = false, sharedStore = правда, journalDirectory = C:\Users ****\AppData\Local\Temp\hornetq-данные/журнал, bindingsDirectory = данные/привязок, largeMessagesDirectory = данные /largemessages, pagingDirectory = данные/пейджинга)
2015-04-09 11:12: 04.410 INFO 4528 --- [main] org.hornetq.core.server: HQ221045: libaio недоступен, переключение конфигурации в NIO
2015-04-09 11:12: 04.520 INFO 4528 --- [main] org.hornetq.core.server: HQ221043: добавление поддержки протокола CORE
2015-04-09 11:12: 04.629 INFO 4528 --- [main] org.hornetq.core.server: HQ221003: пытается развернуть очередь jms.queue.jms.testqueue
2015-04-09 11:12: 05.545 INFO 4528 --- [main] org.hornetq.core.server: HQ221020: начальный Netty Acceptor версии 4.0.13.Final localhost: 5446
2015-04-09 11:12: 05.578 INFO 4528 --- [main] org.hornetq.core.server: HQ221007: Сервер теперь в прямом эфире
2015-04-09 11:12: 05.578 INFO 4528 --- [main] org.hornetq.core.server: HQ221001: HornetQ Server версия 2.4.5.FINAL(Wild Hornet, 124) [c139929d-d90f-11e4-ba2e -e58abf5d6944]

Я вижу clustered=true в обоих выводах, и это показывало бы false, если бы я удалил конфигурацию кластера из HornetQConfigurationCustomizer, поэтому он должен иметь некоторый эффект.

Теперь ServerSend показывает это на выходе консоли:

Отправка сообщения: Временная метка с сервера: 1428574324910
Отправка сообщения: Временная метка с сервера: 1428574329899
Отправка сообщения: Временная отметка с сервера: 1428574334904

Однако ServerReceive ничего не показывает.

Похоже, что сообщения не пересылаются из ServerSend в ServerReceive.

Я сделал еще несколько тестов, создав еще два Spring приложения для загрузки (ClientSend и ClientReceive), которые не имеют встроенного сервера HornetQ и вместо этого подключаются к "родному" серверу.

pom.xml для обоих клиентских приложений содержит:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>

DemoHornetqClientSendApplication:

@SpringBootApplication
@EnableScheduling
public class DemoHornetqClientSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${queue}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqClientSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Client: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }
}

application.properties(ClientSend):

spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5446

queue=jms.testqueue

DemoHornetqClientReceiveApplication:

@SpringBootApplication
@EnableJms
public class DemoHornetqClientReceiveApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${queue}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqClientReceiveApplication.class, args);
    }

    @JmsListener(destination="${queue}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

application.properties(ClientReceive):

spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5445

queue=jms.testqueue

Теперь консоль показывает это:

ServerReveive:

Получено сообщение: Временная метка от Клиента: 1428574966630
Получено сообщение: Временная метка от Клиента: 1428574971600
Полученное сообщение: отметка времени от клиента: 1428574976595

ClientReceive:

Получено сообщение: Срок от сервера: 1428574969436
Получено сообщение: Срок от сервера: 1428574974438
Получено сообщение: Время с сервера: 1428574979446

Если у меня есть ServerSend, работающий некоторое время, а затем запустите ClientReceive, он также получит все сообщения, помещенные в очередь до этой точки, так что это показывает, что сообщения не просто исчезают где-то, либо могут быть использованы из другого места.

Для полноты я также указал ClientSend на ServerSend и ClientReceive на ServerReceive, чтобы узнать, есть ли проблемы с кластеризацией и клиентами InVM, но опять же не было выходов, указывающих на то, что любое сообщение было получено либо в ClientReceive, либо в ServerReceive.

Таким образом, кажется, что доставка сообщений в/из каждого из встроенных брокеров напрямую подключенным внешним клиентам работает нормально, но сообщения не пересылаются между брокерами в кластере.

Итак, после всего этого, большой вопрос, что неправильно с настройкой, что сообщения не пересылаются внутри кластера?

4b9b3361

Ответ 1

http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html#d0e595

"Ядро HornetQ разработано как набор простых POJO, поэтому, если у вас есть приложение, для которого требуется функция обмена сообщениями внутри, но вы не хотите раскрывать это как сервер HornetQ, вы можете напрямую создавать и внедрять серверы HornetQ в свое приложение."

Если вы внедряете его, вы не подвергаете его работе как серверу. Каждый из ваших контейнеров имеет отдельный экземпляр. Это эквивалентно запуску 2-х экземпляров шершета и присвоению им одного и того же имени очереди. Один записывает в эту очередь в первом экземпляре, а другой слушает очередь на втором экземпляре.

Если вы хотите разделить ваши приложения таким образом, вам нужно иметь одно место, действующее как сервер. Наверное, вы хотите сгруппировать. Это не относится к Hornet, BTW. Эта схема часто встречается.