Подтвердить что ты не робот

Принуждение EventProcessorHost к повторной доставке неудачных событий Event Hub Event Event для метода IEventProcessor.ProcessEvents

Приложение использует .NET 4.6.1 и пакет nuget Microsoft.Azure.ServiceBus.EventProcessorHost v2.0.2 вместе с зависимостью пакета WindowsAzure.ServiceBus v3.0.1 для обработки сообщений концентратора событий Azure.

Приложение имеет реализацию IEventProcessor. Когда необработанное исключение EventProcessorHost из метода ProcessEventsAsync EventProcessorHost никогда не отправляет эти сообщения повторно в работающий экземпляр IEventProcessor. (Анекдотически, он будет повторно отправлять, если приложение хостинга остановлено и перезапущено или если аренда потеряна и повторно получена.)

Есть ли способ заставить сообщение события, в результате которого EventProcessorHost повторно отправить EventProcessorHost в реализацию IEventProcessor?

В этом комментарии представлено одно возможное решение почти идентичного вопроса: пересылать необработанные сообщения EventHub в IEventProcessor.ProcessEventsAsync

В комментарии предлагается хранить копию последнего успешно обработанного сообщения о событии и явно указывать контрольные точки, используя это сообщение, когда возникает исключение в ProcessEventsAsync. Однако после реализации и тестирования такого решения EventProcessorHost прежнему не отправляет повторно. Реализация довольно проста:

private EventData _lastSuccessfulEvent;

public async Task ProcessEventsAsync(
    PartitionContext context,
    IEnumerable<EventData> messages)
{
    try
    {
        await ProcessEvents(context, messages);     // does actual processing, may throw exception
        _lastSuccessfulEvent = messages
            .OrderByDescending(ed => ed.SequenceNumber)
            .First();
    }
    catch(Exception ex)
    {
        await context.CheckpointAsync(_lastSuccessfulEvent);
    }
}

Анализ вещей в действии: enter image description here

Частичный образец журнала доступен здесь: https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt

4b9b3361

Ответ 1

TL;DR: Единственный надежный способ переигрывать неудачную партию событий в IEventProcessor.ProcessEventsAsync является - Shutdown EventProcessorHost (ака EPH) сразу - либо с помощью eph.UnregisterEventProcessorAsync() или завершения процесса - в зависимости от ситуации, Это позволит другим экземплярам EPH получить аренду для этого раздела и начать с предыдущей контрольной точки.

Прежде чем объяснить это - я хочу сказать, что это отличный вопрос, и действительно, это был один из самых сложных вариантов дизайна, который мы должны были сделать для EPH. На мой взгляд, это был компромисс ч/б: usability/supportability структуры EPH, а не Technical-Correctness.

Идеальная ситуация была бы такой: когда пользовательский код в IEventProcessorImpl.ProcessEventsAsync создает исключение - библиотека EPH не должна его перехватывать. Это должно было позволить это Exception - сбой процесса, и crash-dump ясно показывает, что callstack. Я до сих пор верю - это самое technically-correct решение.

Текущая ситуация: контракт IEventProcessorImpl.ProcessEventsAsync API & EPH:

  1. до тех пор, пока EventData может быть получен от службы EventHubs - продолжайте вызывать пользовательский обратный вызов (IEventProcessorImplementation.ProcessEventsAsync) с EventData's и если пользовательский обратный вызов выдает ошибки при вызове, уведомите EventProcessorOptions.ExceptionReceived.
  2. Код пользователя внутри IEventProcessorImpl.ProcessEventsAsync должен обрабатывать все ошибки и включать в себя Retry's мере необходимости. EPH не устанавливает никакого тайм-аута для этого обратного вызова, чтобы предоставить пользователям полный контроль над временем обработки.
  3. Если конкретное событие является причиной проблемы - пометьте EventData специальным свойством - для ex: type = poison-event и повторно отправьте в тот же EventHub (EventHub указатель на фактическое событие, скопируйте эти EventData.Offset и SequenceNumber в New EventData.ApplicationProperties) или перенаправьте его в очередь SERVICEBUS или сохраните в другом месте, в основном, идентифицируйте и отложите обработку ядовитого события.
  4. если вы обработали все возможные случаи и по-прежнему сталкиваетесь с Exceptions - catch'em & shutdown EPH или failfast процесса с этим исключением. Когда EPH возвращается - он начнёт с того места, где его оставили.

Почему проверка "старого события" НЕ работает (прочитайте это, чтобы понять EPH в целом):

За кулисами EPH запускает насос для каждого получателя раздела EventHub Consumergroup - работа которого заключается в том, чтобы запустить получатель с заданной checkpoint (если он есть) и создать выделенный экземпляр реализации IEventProcessor а затем receive из назначенного раздела EventHub из указанного Offset в контрольной точке (если не присутствует - EventProcessorOptions.initialOffsetProvider) и в конечном итоге вызвать IEventProcessorImpl.ProcessEventsAsync. Цель Checkpoint - обеспечить надежный запуск обработки сообщений, когда процесс EPH завершает работу и владелец раздела перемещается в другие экземпляры EPH. Таким образом, checkpoint будет потребляться только при запуске НАСОСА и НЕ будет считываться после запуска насоса.

Пока я пишу это, EPH находится на версии 2.2.10.

более общее чтение на Event Hubs...

Ответ 2

Простой ответ: Вы пробовали EventProcessorHost.ResetConnection(string partiotionId)?

Комплексный ответ: Это может быть проблема архитектуры, которая должна быть решена в конце, почему обработка завершилась неудачно? это была временная ошибка? повторная логика обработки является возможным сценарием? И так далее...