Подтвердить что ты не робот

ActiveMQ: как обращаться с отказоустойчивостью брокера при использовании временных очередей

В моих JMS-приложениях мы используем временные очереди для Продюсеров, чтобы получать ответы от потребительских приложений.

Я столкнулся с такой же проблемой на моем конце, как упоминалось в этом потоке: http://activemq.2283324.n4.nabble.com/jira-Created-AMQ-3336-Temporary-Destination-errors-on-H-A-failover-in-broker-network-with-Failover-tt-td3551034.html#a3612738

Всякий раз, когда я перезапускал произвольный брокер в своей сети, у меня возникало много ошибок, подобных этому в моем журнале приложений-приложениях при попытке отправить ответ на временную очередь:

javax.jms.InvalidDestinationException:
  Cannot publish to a deleted Destination: temp-queue://ID:...

Затем я увидел ответ Гэри, предлагающий использовать

jms.watchTopicAdvisories=false

как параметр url на клиенте brokerURL. С этим дополнительным параметром я быстро изменил URL-адреса клиентских брокеров. Однако теперь я вижу ошибки, подобные этому, когда я перезапускаю своих брокеров в сети для этого тестирования после сбоя:

javax.jms.JMSException: 
  The destination temp-queue:
    //ID:client.host-65070-1308610734958-2:1:1 does not exist.

Я использую версию ActiveMQ 5.5. И мой URL-адрес брокера клиента выглядит следующим образом:

failover:(tcp://amq-host1:61616,tcp://amq-host2.tred.aol.com:61616,tcp://amq-host3:61616,tcp://amq-host4:61616)?jms.useAsyncSend=true&timeout=5000&jms.watchTopicAdvisories=false

Кроме того, здесь приведен мой XML файл activemq для одного из 4-х брокеров: amq1.xml

Может кто-то здесь, пожалуйста, изучите эту проблему и предложите мне, какую ошибку я делаю в этой настройке.

Update:

Чтобы уточнить, как я выполняю запрос-ответ в своем коде:

  • Я уже использую назначение для каждого производителя (т.е. временную очередь) и устанавливаю его в заголовке ответа на каждое сообщение.
  • Я уже отправляю уникальный идентификатор корреляции для сообщения в заголовке JMSCorrelationID.
  • Насколько я знаю, даже Camel и Spring также используют временную очередь для механизма запроса-ответа. Единственное отличие состоит в том, что реализация Spring JMS создает и уничтожает временную очередь для каждого сообщения, тогда как я создаю временную очередь для жизни продюсера. Эта временная очередь уничтожается при закрытии приложения клиента (продюсера) или брокером AMQ, когда он понимает, что с этой временной очередью нет активного производителя.
  • Я уже устанавливаю истечение сообщения для каждого сообщения на стороне Продюсера, чтобы сообщение не задерживалось в очереди слишком долго (60 секунд).
4b9b3361

Ответ 1

Существует атрибут брокера, org.apache.activemq.broker.BrokerService # cacheTempDestinations, который должен помочь в отказе: case. Установите для этого значение true в xml-конфигурации, и временный пункт назначения не будет удален сразу, когда клиент отключится. Быстрый переход на другой ресурс: повторное подключение сможет продлить и/или потреблять из очереди temp снова.

Существует задача таймера, основанная на timeBeforePurgeTempDestinations (по умолчанию 5 секунд), которая обрабатывает удаление кеша.

Одно предупреждение, но я не вижу никаких тестов в activemq-core, которые используют этот атрибут, поэтому я не могу дать вам никаких гарантий на этом.

Ответ 2

Временные очереди создаются в брокере, к которому подключается запросчик (производитель) в сценарии запроса-ответа. Они создаются с помощью javax.jms.Session, поэтому при отключении этого сеанса либо из-за отказа клиента, либо от отказа/отказа брокера эти очереди постоянно исчезают. Ни один из других брокеров не поймет, что имеется в виду, когда один из ваших потребителей пытается ответить на эти очереди; следовательно, ваше исключение.

Это требует архитектурного сдвига в мышлении, предполагая, что вы хотите справиться с отказоустойчивостью и сохранить все свои сообщения. Вот общий способ атаки на проблему:

  • Ваши ответные заголовки должны ссылаться на очередь, относящуюся к процессу запроса: например. queue:response.<client id>. Идентификатор клиента может быть стандартным именем, если у вас ограниченное количество клиентов или UUID, если у вас их большое количество.
  • Исходящее сообщение должно установить идентификатор корреляции (просто sting, который позволяет связать запрос с запросами ответа - после того, как все могут сделать более одного запроса одновременно). Это указано в заголовке JMSCorrelationID и должно быть скопировано из запроса в ответное сообщение.
  • Проситель должен настроить слушателя в этой очереди, которая вернет тело сообщения в запрашивающий поток на основе этого идентификатора корреляции. Для этого нужно написать многопотоковый код, так как вам нужно вручную управлять чем-то вроде карты корреляционных идентификаторов с исходными потоками (возможно, через Futures).

Это аналогичный подход к использованию Apache Camel для запроса-ответа по обмену сообщениями.

Одна вещь, о которой следует помнить, заключается в том, что очередь не уходит, когда клиент делает, поэтому вы должны установить время для жизни в ответном сообщении, чтобы оно удалялось от брокера, если оно не было уничтожено, иначе вы получите отставание от непонятных сообщений. Вам также нужно будет настроить стратегию очереди с мертвой буквой, чтобы автоматически отменить истекшие сообщения.