Подтвердить что ты не робот

Как выполнять обработку ошибок с помощью EasyNetQ/RabbitMQ

Я использую RabbitMQ в С# с библиотекой EasyNetQ. Здесь я использую шаблон pub/sub. У меня все еще есть несколько вопросов, которые, я надеюсь, кто-нибудь может мне помочь:

  • При появлении ошибки при использовании сообщения она автоматически перемещается в очередь ошибок. Как я могу реализовать повторные попытки (чтобы он помещался обратно в исходную очередь и когда он не обрабатывал X раз, он переместился в очередь с мертвой буквой)?
  • Насколько я вижу, всегда существует одна очередь ошибок, используемая для вывода сообщений из всех остальных очередей. Как я могу иметь 1 очередь ошибок для каждого типа, так что каждая очередь имеет свою собственную связанную очередь ошибок?
  • Как я могу легко повторять сообщения, которые находятся в очереди ошибок? Я пробовал Hosepipe, но он просто переиздает сообщения в очередь ошибок вместо исходной очереди. Мне не нравится этот вариант, потому что я не хочу играть в консоли. Предпочтительно, я просто программировал бы против очереди ошибок.

Кто-нибудь?

4b9b3361

Ответ 1

Проблема, с которой вы работаете в EasyNetQ/RabbitMQ, заключается в том, что она намного более "сырая" по сравнению с другими службами обмена сообщениями, такими как SQS или Azure Service Bus/Queues, но я сделаю все возможное, чтобы указать вам в правильном направлении,

Вопрос 1.

Это будет для вас. Самый простой способ - вы не можете отправить сообщение в RabbitMQ/EasyNetQ, и оно будет помещено во главе очереди для повторной попытки. Это не рекомендуется, потому что он будет почти сразу же повторен (без временной задержки), а также будет блокировать обработку других сообщений (если у вас есть один абонент с номером предварительной выборки 1).

Я видел другие реализации использования "MessageEnvelope". Таким образом, класс-оболочка, который, когда сообщение терпит неудачу, увеличивает значение переменной повтора в MessageEnvelope и возвращает сообщение обратно в очередь. Вы должны сделать это и написать код обертывания вокруг ваших обработчиков сообщений, это не будет функцией EasyNetQ.

Используя вышеизложенное, я также видел, как люди используют конверты, но допускают, чтобы сообщение было мертвым. После того, как он находится в очереди с мертвой буквой, в очереди мертвой буквы есть другое приложение/работник, читающий элементы.

Все эти подходы, приведенные выше, имеют небольшую проблему в том, что на самом деле нет хорошего способа иметь логарифмическую/экспоненциальную/любую возрастающую задержку при обработке сообщения. Вы можете "удерживать" сообщение в коде некоторое время, прежде чем возвращать его в очередь, но это не очень хорошо.

Из всех этих вариантов, вероятно, лучший способ - ваше собственное приложение, просматривающее очередь с мертвой буквой, и решение о перенаправлении сообщения на основе конверта, содержащего счетчик повторных попыток.

Вопрос 2.

Вы можете указать обмен мертвой буквой в очереди с помощью расширенного API. (https://github.com/EasyNetQ/EasyNetQ/wiki/The-Advanced-API#declaring-queues). Однако это означает, что вам придется использовать расширенный API почти везде, так как использование простой реализации IBus для подписки/публикации ищет очереди, имена которых основаны как на типе сообщения, так и на имени абонента. Использование пользовательского объявления очереди означает, что вы будете обрабатывать наименования своих очередей самостоятельно, а это значит, что когда вы подписываетесь, вам нужно знать имя того, что вы хотите и т.д. Больше не подписывайтесь на вас!

Вопрос 3

Очередь ошибок/Очередь мертвой буквы - это еще одна очередь. Вы можете слушать эту очередь и делать то, что вам нужно сделать. Но на самом деле не существует какого-либо из готового решения, которое звучит так, как будто оно соответствует вашим потребностям.

Ответ 2

Я реализовал именно то, что вы описали. Вот несколько советов, основанных на моем опыте и связанных с каждым из ваших вопросов.

Q1 (как повторить X раз):

Для этого вы можете использовать IMessage.Body.BasicProperties.Headers. Когда вы отправляете сообщение из очереди ошибок, просто добавьте заголовок с выбранным именем. Ищите этот заголовок для каждого сообщения, входящего в очередь ошибок, и увеличивайте его. Это даст вам количество повторных попыток.

Очень важно, чтобы у вас была стратегия для того, что делать, когда сообщение превышает ограничение на повторение X. Вы не хотите терять это сообщение. В моем случае я пишу сообщение на диск в этот момент. Это дает вам много полезной информации для отладки, чтобы вернуться к ней позже, потому что EasyNetQ автоматически обертывает ваше исходное сообщение информацией об ошибках. У него также есть исходное сообщение, чтобы вы могли, если хотите, вручную (или, может быть, автоматизированным с помощью какого-либо кода повторной обработки партии) запросить сообщение позже определенным образом.

Вы можете посмотреть код в утилите Hosepipe, чтобы увидеть хороший способ сделать это. Фактически, если вы следуете шаблону, который видите там, вы можете даже использовать Hosepipe позже, чтобы запросить сообщения, если вам нужно.

Q2 (как создать очередь ошибок в исходящей очереди):

Вы можете использовать EasyNetQ Advanced Bus, чтобы сделать это чисто. Используйте IBus.Advanced.Container.Resolve<IConventions> для доступа к интерфейсу условных обозначений. Затем вы можете установить соглашения для именования очереди ошибок с помощью conventions.ErrorExchangeNamingConvention и conventions.ErrorQueueNamingConvention. В моем случае я устанавливаю соглашение на основе имени исходной очереди, чтобы каждый раз, когда я создавал очередь, я получал очередь очередей queue/queue_error.

Q3 (как обрабатывать сообщения в очереди ошибок):

Вы можете объявить пользователя для очереди ошибок так же, как и в любой другой очереди. Опять же, AdvancedBus позволяет сделать это чисто, указав, что тип, выходящий из очереди, - EasyNetQ.SystemMessage.Error. Итак, IAdvancedBus.Consume<EasyNetQ.SystemMessage.Error>() доставит вас туда. Повторная попытка просто означает повторную публикацию исходного обмена (обращая внимание на счет повтора, который вы помещаете в заголовок (см. Мой ответ на Q1, выше), а информация в сообщении об ошибке, которую вы уничтожили из очереди ошибок, может помочь вам найти цель для переиздание.