Подтвердить что ты не робот

Как реализовать сагу с использованием шаблона разброса/сбора в MassTransit 3.0

Джимми Боагард описывает цепочку быстрого питания McDonalds здесь, сравнивая ее с диаграмма рассеяния.

Изображение рабочего процесса, украденное над статьей: введите описание изображения здесь

Начальные мысли о реализации:

Чтобы иметь общий интерфейс для всех типов событий, связанных с FoodOrdered, которые получат все продовольственные станции, а затем каждая станция питания сможет потреблять/создавать свой соответствующий элемент и публиковать общее событие. Пример: картошка фри и булочная с начинкой получает сообщение о заказе Fries. Станция жаркого, потребляющая заказ, объявляет ItemDoneEvent, который слушает сага.

Исходные проблемы:

Так как сага не заботится о том, какой тип пищи был завершен, только тот факт, что вся еда завершена, это, казалось бы, было бы правильным решением. Однако после предупреждает здесь относительно обмена очередями и замечает, что Consumer.Conditional фильтрация была удалена с помощью MassTransit 3.0 Кажется, что в рамках платформы говорится, что "Bad Things (TM) произойдет" с этим типом подхода. Но я не уверен, как еще вы это сделаете, не создавая запрос и ответ на сообщение, а также сопоставляя Event для каждого продукта питания на кухне. Пример: FriesOrdered, BurgerOrdered FriesCooked, BurgerCooked. Это было бы очень утомительно, если бы вам приходилось делать это для каждого предмета на кухне?

Учитывая вышеупомянутые проблемы - как выглядит хороший пример саги для такого типа рабочего процесса?

4b9b3361

Ответ 1

Проблема с отменой завершенных событий в саге заключается в том, что она создает конфликт на общем ресурсе (т.е. состоянии саги).

У Джима есть еще одно сообщение, которое появилось после того, на который вы ссылались, в котором изложена проблема и решение. Конечно, он специально говорит о NServiceBus, но проблема и понятия одинаковы.

https://lostechies.com/jimmybogard/2014/02/27/reducing-nservicebus-saga-load/

Создайте внешнее хранилище. Поместите запись для каждого рабочего элемента. Пусть каждый работник завершает свою работу, пока сага эффективно опросает задержку обмена сообщениями, чтобы убедиться, что все работы выполнены.

Затем вы по-прежнему выполняете сборку рассеяния, но "агрегатор" заменен шаблоном диспетчера процессов, чтобы уменьшить количество конфликтов.

Ответ 2

У меня возникла аналогичная проблема - нужно опубликовать несколько десятков команд (все тот же интерфейс, IMyRequest) и подождать все.

На самом деле моя команда инициирует другую сагу, которая публикует IMyRequestDone в конце обработки без сохранения саги. (Необходимо завершить их через некоторое время.) Поэтому вместо сохранения числа завершенных вложенных саг в родительской саге я просто запрашиваю состояние экземпляров детской саги.

Проверьте каждое сообщение MyRequestDone:

Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x =>
{
    // timeout for all requests
    x.Delay = TimeSpan.FromMinutes(10);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});


During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay;
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(MyRequestDone)
        .Then(context =>
        {
            if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow)
                throw new TimeoutException();
        })
        .If(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed
            return allDone;
        }, x => x
            .Unschedule(FailSagaOnRequestsTimeout)
            .TransitionTo(Active))
        )
        .Catch<TimeoutException>(x => x.TransitionTo(Failed))
);

During(WaitingMyResponses,
    When(FailSagaOnRequestsTimeout.Received)
        .TransitionTo(Failed)

Периодически проверяйте, что все выполненные запросы ( "Уменьшение нагрузки Saga NServiceBus" ):

Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x =>
{
    // check interval
    x.Delay = TimeSpan.FromSeconds(15);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});

During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10);
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(CheckAllRequestsDone.Recieved)
        .Then(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing));
            if (!allDone)           
            {
                if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay)              
                    throw new TimeoutException();
                throw new NotAllDoneException();
            }
        })
        .TransitionTo(Active)
        .Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)))
        .Catch<TimeoutException>(x => x.TransitionTo(Failed));

Ответ 3

Не мог ли вы "просто" передать объект в очереди в качестве параметра события? Когда слушатель саги получает событие "завершено заказ", он будет содержать объект, который завершен в событии?

Я предполагаю, что он отправляется в очередь с помощью метода Generic, где объект должен реализовывать IFoodOrdered

Затем вы можете реализовать на практике виртуальный метод, который сага может использовать для создания "общей" вещи, когда она поднимается, и вам нужно только реализовать перегрузки для этих специальных элементов, для чего требуется что-то особенное?