Я исследую решения для очередей для одного из моих командных приложений. В идеале мы хотели бы что-то, что можно настроить как в виде легкого, в процессе брокера (для обмена сообщениями с низкой пропускной способностью между потоками), так и в качестве внешнего брокера. Есть ли там сервер MQ, который может это сделать? Большинство из них, похоже, требуют установки в качестве внешнего объекта. Похоже, что ZeroMQ ближе всего подходит к процессинговому решению, но, похоже, он больше похож на "UDP-сокет на стероидах", и нам нужна надежная доставка.
Существуют ли какие-либо серверы MQ, которые могут запускаться в процессе Java?
Ответ 1
Как мы сказали, ActiveMQ
немного тяжелее, чем ZeroMQ
, но он отлично работает как встроенный процесс.
Вот простой пример с Spring
и ActiveMQ
.
Слушатель сообщений, который будет использоваться для проверки очереди:
public class TestMessageListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(TestMessageListener.class);
@Override
public void onMessage(Message message) {
/* Receive the text message */
if (message instanceof TextMessage) {
try {
String text = ((TextMessage) message).getText();
System.out.println("Message reception from the JMS queue : " + text);
} catch (JMSException e) {
logger.error("Error : " + e.getMessage());
}
} else {
/* Handle non text message */
}
}
}
ActiveMQ
контекстная конфигурация:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="jmsQueueConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61617</value>
</property>
</bean>
<bean id="pooledJmsQueueConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="jmsQueueConnectionFactory" />
</bean>
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="messageQueue" />
</bean>
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="pooledJmsQueueConnectionFactory" />
<property name="pubSubDomain" value="false"/>
</bean>
<bean id="testMessageListener" class="com.example.jms.TestMessageListener" />
<bean id="messageQueuelistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledJmsQueueConnectionFactory" />
<property name="destination" ref="QueueDestination" />
<property name="messageListener" ref="testMessageListener" />
<property name="concurrentConsumers" value="5" />
<property name="acceptMessagesWhileStopping" value="false" />
<property name="recoveryInterval" value="10000" />
<property name="cacheLevelName" value="CACHE_CONSUMER" />
</bean>
</beans>
Тест JUnit
:
@ContextConfiguration(locations = {"classpath:/activeMQ-context.xml"})
public class SpringActiveMQTest extends AbstractJUnit4SpringContextTests {
@Autowired
private JmsTemplate template;
@Autowired
private ActiveMQDestination destination;
@Test
public void testJMSFactory() {
/* sending a message */
template.convertAndSend(destination, "Hi");
/* receiving a message */
Object msg = template.receive(destination);
if (msg instanceof TextMessage) {
try {
System.out.println(((TextMessage) msg).getText());
} catch (JMSException e) {
System.out.println("Error : " + e.getMessage());
}
}
}
}
Зависимости для добавления в pom.xml
:
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${org.springframework-version}</version>
</dependency>
<!-- ActiveMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.6.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.6.0</version>
</dependency>
Ответ 2
Клиент WebSphere MQ имеет возможность выполнять multicast pub/sub. Это обеспечивает возможность "клиент-клиент", которая обходит диспетчер очереди, хотя диспетчер очереди требуется для установления соединения.