Подтвердить что ты не робот

Связь между обработчиками команд, агрегатами, хранилищем и хранилищем событий в CQRS

Я хотел бы узнать некоторые детали отношений между обработчиками команд, агрегатами, репозиторием и хранилищем событий в системах на основе CQRS.

То, что я понял до сих пор:

  • Обработчики команд получают команды от шины. Они несут ответственность за загрузку соответствующей совокупности из репозитория и вызов логики домена в совокупности. После этого они удаляют команду с шины.
  • Агрегат обеспечивает поведение и внутреннее состояние. Государство никогда не является публичным. Единственный способ изменить состояние - это использовать поведение. Методы, моделирующие это поведение, создают события из свойств команды и применяют эти события к агрегату, которые, в свою очередь, вызывают обработчики событий, которые соответствующим образом устанавливают внутреннее состояние.
  • Репозиторий просто позволяет загружать агрегаты по определенному идентификатору и добавлять новые агрегаты. В основном, репозиторий связывает домен с хранилищем событий.
  • Хранилище событий, но не в последнюю очередь, отвечает за хранение событий в базе данных (или в любом другом хранилище) и перезагрузку этих событий в виде так называемого потока событий.

До сих пор так хорошо. Теперь есть некоторые проблемы, которые я еще не получил:

  • Если обработчик команды должен вызывать поведение в еще существующем агрегате, все довольно просто. Обработчик команд получает ссылку на репозиторий, вызывает его метод loadById и возвращается суммарный результат. Но что делает обработчик команд, когда еще нет агрегата, но нужно создать? По моему мнению, совокупность должна быть позже перестроена с использованием событий. Это означает, что создание агрегата выполняется в ответ на событие fooCreated. Но чтобы иметь возможность хранить любое событие (включая fooCreated), мне нужен агрегат. Поэтому это выглядит как проблема с курицей и яйцом: я не могу создать агрегат без события, но единственным компонентом, который должен создавать события, является совокупность. Итак, в основном это сводится к: Как мне создать новые агрегаты, кто что делает?
  • Когда агрегат запускает событие, внутренний обработчик событий реагирует на него (как правило, вызывается посредством метода apply) и изменяет состояние агрегата. Как это событие передается в хранилище? Кто инициирует действие "отправить новые события в хранилище/хранилище событий"? Сама совокупность? Репозиторий, наблюдая за агрегатом? Кто-то, кто подписался на внутренние события?...?
  • И последнее, но не менее важное: у меня проблема с пониманием концепции потока событий правильно: в моем воображении это просто нечто вроде упорядоченного списка событий. Что важно, так это то, что он "приказал". Правильно ли это?
4b9b3361

Ответ 1

Следующее основано на моем собственном опыте и моих экспериментах с различными платформами, такими как Lokad.CQRS, NCQRS и т.д. Я уверен, что есть несколько способов справиться с этим. Я напишу то, что имеет для меня наибольшее значение.

1. Совокупное создание:

Каждый раз, когда обработчик команды нуждается в агрегате, он использует репозиторий. Репозиторий извлекает соответствующий список событий из хранилища событий и вызывает перегруженный конструктор, вводя события

var stream = eventStore.LoadStream(id)
var User = new User(stream)

Если агрегат раньше не существовал, поток будет пустым, и вновь созданный объект будет в нем исходным. Возможно, вам захочется убедиться, что в этом состоянии разрешено приводить в действие только несколько команд, например, User.Create().

2. Хранение новых событий

Обработка команд происходит внутри Единицы работы. Во время выполнения команды каждое результирующее событие будет добавлено в список внутри агрегата (User.Changes). После завершения исполнения изменения будут добавлены в хранилище событий. В приведенном ниже примере это происходит в следующей строке:

store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

3. Порядок событий

Представьте, что произойдет, если два последующих события CustomerMoved будут воспроизведены в неправильном порядке.

Пример

Я попытаюсь проиллюстрировать это с помощью фрагмента псевдокода (я намеренно оставил проблемы с репозиторией внутри обработчика команд, чтобы показать, что произойдет за кулисами):

Служба приложений:

UserCommandHandler
    Handle(CreateUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Create(cmd.UserName, ...)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

    Handle(BlockUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Block(string reason)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

Общий счет:

User
    created = false
    blocked = false

    Changes = new List<Event>

    ctor(eventStream)
        foreach (event in eventStream)
            this.Apply(event)

    Create(userName, ...)
        if (this.created) throw "User already exists"
        this.Apply(new UserCreated(...))

    Block(reason)
        if (!this.created) throw "No such user"
        if (this.blocked) throw "User is already blocked"
        this.Apply(new UserBlocked(...))

    Apply(userCreatedEvent)
        this.created = true
        this.Changes.Add(userCreatedEvent)

    Apply(userBlockedEvent)
        this.blocked = true
        this.Changes.Add(userBlockedEvent)

Update:

В качестве побочной заметки: ответ Ив напомнил мне интересную статью Уди Дахана из пары лет назад:

Ответ 2

Небольшая вариация на Деннисе отличный ответ:

  • Когда вы имеете дело с "творческими" вариантами использования (т.е. это должно отражать новые агрегаты), попробуйте найти другой агрегат или factory, на который вы можете перенести эту ответственность. Это не противоречит тому, что имеет ctor, который принимает события для гидратации (или любого другого механизма для регидратации в этом отношении). Иногда factory представляет собой только статический метод (хорош для захвата контекста//намерения), иногда это метод экземпляра другого агрегата (хорошее место для наследования данных), иногда это явный объект factory ( хорошее место для "сложной" логики создания).
  • Мне нравится предоставлять явный метод GetChanges() в моем агрегате, который возвращает внутренний список в виде массива. Если мой агрегат должен оставаться в памяти за пределами одного исполнения, я также добавляю метод AcceptChanges(), чтобы указать, что внутренний список должен быть очищен (обычно вызывается после того, как вещи были сброшены в хранилище событий). Здесь вы можете использовать модель pull (GetChanges/Changes) или push (think.net event или IObservable). Многое зависит от транзакционной семантики, технологий, потребностей и т.д.
  • Ваш eventstream - связанный список. Каждая ревизия (event/changeset) указывает на предыдущую (a.k.a. parent). Ваш поток событий представляет собой последовательность событий/изменений, которые произошли с определенным агрегатом. Заказ должен быть гарантирован только в пределах общей границы.

Ответ 3

Я почти согласен с yves-reynhout и dennis-traub, но я хочу показать вам, как я это делаю. Я хочу лишить своих совокупностей ответственности применять эти события на себе или повторно увлажнять себя; в противном случае существует много дубликатов кода: каждый агрегатный конструктор будет выглядеть одинаково:

UserAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


OrderAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


ProfileAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)

Эти обязанности могут быть переданы диспетчеру команд. Команда обрабатывается непосредственно агрегатом.

Command dispatcher class

    dispatchCommand(command) method:
        newEvents = ConcurentProofFunctionCaller.executeFunctionUntilSucceeds(tryToDispatchCommand)
        EventDispatcher.dispatchEvents(newEvents)

    tryToDispatchCommand(command) method:
        aggregateClass = CommandSubscriber.getAggregateClassForCommand(command)
        aggregate = AggregateRepository.loadAggregate(aggregateClass, command.getAggregateId())
        newEvents = CommandApplier.applyCommandOnAggregate(aggregate, command)
        AggregateRepository.saveAggregate(command.getAggregateId(), aggregate, newEvents)

ConcurentProofFunctionCaller class

    executeFunctionUntilSucceeds(pureFunction) method:
        do this n times
            try
                call result=pureFunction()
                return result
            catch(ConcurentWriteException)
                continue
        throw TooManyRetries    

AggregateRepository class

     loadAggregate(aggregateClass, aggregateId) method:
         aggregate = new aggregateClass
         priorEvents = EventStore.loadEvents()
         this.applyEventsOnAggregate(aggregate, priorEvents)

     saveAggregate(aggregateId, aggregate, newEvents)
        this.applyEventsOnAggregate(aggregate, newEvents)
        EventStore.saveEventsForAggregate(aggregateId, newEvents, priorEvents.version)

SomeAggregate class
    handleCommand1(command1) method:
        return new SomeEvent or throw someException BUT don't change state!
    applySomeEvent(SomeEvent) method:
        changeStateSomehow() and not throw any exception and don't return anything!

Имейте в виду, что это псевдо-код, спроецированный из приложения PHP; реальный код должен иметь введенные вещи, а другие обязанности реорганизованы в других классах. Идея состоит в том, чтобы сохранить агрегаты как можно более чистыми и избегать дублирования кода.

Некоторые важные аспекты агрегатов:

  • обработчики команд не должны изменять состояние; они возвращают события или бросить исключения Применяется
  • не должно исключать и ничего не должно возвращать; они изменяют только внутреннее состояние