Как использовать значение смещения EventData ServiceBus - программирование
Подтвердить что ты не робот

Как использовать значение смещения EventData ServiceBus

У меня есть код, который использует данные события Service Bus, и я подозреваю, что мне нужно использовать свойство offset, поскольку в настоящее время моя программа (или, похоже,) повторно запускает одни и те же данные Event Hub снова и снова.

Мой код выглядит следующим образом:

public class EventHubListener : IEventProcessor
{
    private static EventHubClient _eventHubClient;        
    private const string EhConnectionStringNoPath = "Endpoint=...";
    private const string EhConnectionString = EhConnectionStringNoPath + ";...";
    private const string EhEntityPath = "...";        

    public void Start()
    {
        _eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
        EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();            
        EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);

        foreach (string partitionId in eventHub.PartitionIds)
        {
            defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
            {
                PartitionId = partitionId
            }, new EventProcessorCheckpointManager());

            Console.WriteLine("Processing : " + partitionId);
        }
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (EventData eventData in messages)
        {                
            string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
            MyData data = JsonConvert.DeserializeObject<MyData>(bytes);

Поскольку я получаю одни и те же сообщения снова и снова, я подозреваю, что мне нужно сделать что-то вроде этого:

string bytes = Encoding.UTF8.GetString(eventData.GetBytes(), eventData.Offset, eventData.SerializedSizeInBytes - eventData.Offset);

Тем не менее, Offset - это строка, даже если она представляет собой числовое значение (например, "12345"). Документация по context.CheckPointAsync() заставила его казаться, что это может быть ответ; однако, выдавая, что в конце цикла, кажется, не имеет значения.

Итак, у меня есть два вопроса:

  1. Что такое офсет? Является ли это тем, что я думаю (т.е. числовым маркером для точки в потоке), и если да, то почему это строка?
  2. Зачем мне снова получать одни и те же сообщения? Поскольку я понимаю Event Hubs, хотя они гарантируют хотя бы один раз, как только Checkpoint является проблемой, я не должен получать одни и те же сообщения.

РЕДАКТИРОВАТЬ:

Спустя некоторое время, я придумал что-то, что позволяет избежать этой проблемы; однако я, конечно, не стал бы утверждать, что это решение:

var filteredMessages =
            messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
            .OrderBy(a => a.EnqueuedTimeUtc);

Использование EventProcessorHost похоже, действительно ухудшило проблему; то есть не только переигрывались исторические события, но они, казалось, воспроизводились в произвольном порядке.

РЕДАКТИРОВАТЬ:

Я наткнулся на эту замечательную статью @Mikhail, которая, похоже, касается моей точной проблемы. Тем не мение; и, предположительно, корень моей проблемы (или один из них, если предположить, что это правильно, то я не уверен, почему использование EventProcessorHost не просто работает из коробки, как @Mikhail сказал себя в комментариях). Однако версия ICheckpointManager имеет только один метод интерфейса:

namespace Microsoft.ServiceBus.Messaging
{

    public interface ICheckpointManager
    {
        Task CheckpointAsync(Lease lease, string offset, long sequenceNumber);
    }
}
4b9b3361

Ответ 1

Ваш заголовок должен быть центром событий, а не служебной шиной. По вашему вопросу:

  1. Хотя центр событий имеет аналогичный дизайн, как Kafka, но одна большая разница в том, что вы сами должны управлять смещениями. Брокер событий-хабов совершенно не знает о вашем смещении группы потребителей.
  2. Таким образом, event hub sdk предоставляет некоторый класс поддержки для хранения смещения в учетной записи хранилища, но по-прежнему необходимо вызвать контрольную точку вручную после обработки сообщения.