Подтвердить что ты не робот

Сигнал откат от JMS MessageListener

Я работаю с JMS и ActiveMQ. Все творит чудеса. Я не использую spring, и я не могу.

Интерфейс javax.jms.MessageListener имеет только один метод, onMessage. В рамках реализации есть шанс, что будет выбрано исключение. Если на самом деле возникает исключение, то я говорю, что сообщение не обработано должным образом, и его необходимо повторить. Итак, мне нужно ActiveMQ подождать немного, а затем повторить попытку. т.е. мне нужно исключение для отката транзакции JMS.

Как я могу выполнить такое поведение?

Возможно, в ActiveMQ есть некоторая конфигурация, которую я не смог найти.

Или... может быть, может покончить с регистрацией MessageListener для потребителей и самостоятельно использовать сообщения в виде цикла, например:

while (true) {
    // ... some administrative stuff like ...
    session = connection.createSesstion(true, SESSION_TRANSACTED)
    try {
        Message m = receiver.receive(queue, 1000L);
        theMessageListener.onMessage(m);
        session.commit();
    } catch (Exception e) {
        session.rollback();
        Thread.sleep(someTimeDefinedSomewhereElse);
    }
    // ... some more administrative stuff
}

в паре потоков, вместо регистрации слушателя.

Или... Я мог бы как-то украсить/AOP/byte-манипулировать MessageListener, чтобы сделать это самостоятельно.

Какой маршрут вы возьмете и почему?

note: у меня нет полного контроля над кодом MessageListener.

ИЗМЕНИТЬ Тест для доказательства концепции:

@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        message.acknowledge();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        message.acknowledge();
                }
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    });
    connection.start();

    {
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    }
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    brokerService.stop();

    assertEquals(3, atomicInteger.get());
}
4b9b3361

Ответ 1

Если вы хотите использовать SESSION_TRANSACTED в качестве режима подтверждения, вам необходимо настроить RedeliveryPolicy в Connection/ConnectionFactory. Эта страница на сайте ActiveMQ также содержит некоторую полезную информацию о том, что вам может понадобиться.

Поскольку вы не используете Spring, вы можете настроить RedeliveryPolicy с чем-то похожим на следующий код (взятый из одной из приведенных выше ссылок):

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

Edit Добавив в ответ ваш фрагмент кода, ниже показано, как это работает с транзакциями. Попробуйте этот код с помощью метода Session.rollback(), и вы увидите, что использование SESION_TRANSACTED и Session.commit/rollback работает как ожидалось:

@Test
public void test() throws Exception {
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() {

        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        session.commit();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        session.rollback();
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    });
    connection.start();

    {
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    }
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    assertEquals(3, atomicInteger.get());
}

}

Ответ 2

Вам нужно установить режим подтверждения на Session.CLIENT_ACKNOWLEDGE, клиент подтверждает принятое сообщение, вызвав метод подтверждения сообщения.

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

Затем после обработки сообщения необходимо вызвать метод Message.acknowledge(), чтобы удалить это сообщение.

Message message = ...;
// Processing message

message.acknowledge();

Ответ 3

Если ваш сеанс трансактен, тогда "confirmMode" игнорируется в любом случае. Итак, просто оставьте свой сеанс транзакцией и используйте session.rollback и session.commit для фиксации или отката транзакции.