Подтвердить что ты не робот

Какую "EventBus" использовать в Spring? Встроенный реактор, Акка?

Мы запустим новое приложение Spring 4 через несколько недель. И мы хотели бы использовать некоторую управляемую событиями архитектуру. В этом году я читал здесь и здесь о "Реакторе" и, ища его в Интернете, наткнулся на "Акку".

Итак, теперь у нас есть 3 варианта:

Я не смог найти реального сравнения.


Теперь нам просто нужно что-то вроде:

  • X регистры для прослушивания Event E
  • Y регистры для прослушивания Event E
  • Z отправляет Event E

И затем X и Y будут получать и обрабатывать событие.

Скорее всего мы будем использовать это в асинхронном режиме, но наверняка будут и некоторые синхронные сценарии. И мы скорее всего отправляем всегда класс как событие. (Образцы Reactor в основном используют шаблоны String и String, но также поддерживают объекты).


Насколько я понял, ApplicationEvent работает синхронно по умолчанию, а Reactor работает как метод async. И Reactor также позволяет использовать метод await(), чтобы сделать его синхронным. Akka обеспечивает более или менее то же, что и Reactor, но также поддерживает Remoting.

Что касается метода Reactor await(): может ли он ждать завершения нескольких потоков? Или, может быть, даже частичный набор этих потоков? Если взять пример сверху:

  • X регистры для прослушивания Event E
  • Y регистры для прослушивания Event E
  • Z отправляет Event E

Можно ли сделать это синхронным, сказав: Подождите, пока закончите X и Y. И можно ли его подождать только для X, но не для Y?


Возможно, есть и альтернативы? Как насчет, например, JMS?

Множество вопросов, но, надеюсь, вы можете дать ответы на них.

Спасибо!


EDIT: примеры использования

  • Когда запускается определенное событие, я хотел бы создать 10000 писем. Каждое электронное письмо должно генерироваться с использованием пользовательского контента. Поэтому я создаю много потоков (max = системные ядра процессора), которые создают почту и не блокируют поток вызывающего, потому что это может занять несколько минут.

  • Когда запускается определенное событие, я хотел бы собирать информацию из неизвестного числа служб. Каждая выборка занимает около 100 мс. Здесь я мог бы предположить использование Reactor await, потому что мне нужна эта информация для продолжения моей работы в основном потоке.

  • Когда запускается определенное событие, я хотел бы выполнить некоторые операции на основе конфигурации приложения. Таким образом, приложение должно иметь возможность динамически (un) регистрировать comsumers/обработчики событий. Они будут делать свои вещи с Событием, и мне все равно. Поэтому я бы создал поток для каждого из этих обработчиков и просто продолжал выполнять свою работу в основном потоке.

  • Простая развязка: я в основном знаю все приемники, но я просто не хочу называть каждого получателя в моем коде. Это должно выполняться синхронно.

Звучит так, как будто мне нужен ThreadPool или RingBuffer. У этих структур есть динамические RingBuffers, которые растут по размеру, если это необходимо?

4b9b3361

Ответ 1

Я не уверен, что смогу адекватно ответить на ваш вопрос в этом небольшом пространстве. Но я сделаю это!:)

Spring ApplicationEvent система и реактор действительно отличаются друг от друга по функциональности. ApplicationEvent Маршрутизация основана на типе, обрабатываемом ApplicationListener. Что-то более сложное, чем это, и вам придется реализовать логику самостоятельно (что не обязательно плохо, хотя). Однако реактор обеспечивает комплексный слой маршрутизации, который также очень легкий и полностью расширяемый. Любое сходство в функции между двумя концами связано с возможностью подписки и публикации событий, что на самом деле является признаком любой управляемой событиями системы. Также не забывайте о новом модуле spring-messaging с Spring 4. Это подмножество инструментов, доступных в Spring Integration, а также предоставляет абстракции для построения вокруг управляемой событиями архитектуры.

Reactor поможет вам решить пару ключевых проблем, которые вам в противном случае пришлось бы управлять самостоятельно:

Соответствие выбора: Reactor выполняет Selector совпадение, которое охватывает диапазон совпадений - от простого вызова .equals(Object other) до более сложного соответствия шаблону URI, которое позволяет производить извлечение заполнителя. Вы также можете расширить встроенные селектора с помощью собственной пользовательской логики, чтобы вы могли использовать богатые объекты в качестве ключей уведомлений (например, объектов домена, например).

API-интерфейс Stream и Promise. Вы уже упоминали API Promise со ссылкой на метод .await(), который действительно предназначен для существующего кода, который ожидает блокировки. При написании нового кода с использованием Reactor его нельзя особо подчеркнуть, чтобы использовать композиции и обратные вызовы для эффективного использования системных ресурсов, не блокируя потоки. Блокировка вызывающего абонента практически никогда не является хорошей идеей в архитектуре, которая зависит от небольшого количества потоков для выполнения большого объема задач. Фьючерсы просто не облачно масштабируются, поэтому современные приложения используют альтернативные решения.

Ваше приложение может быть сконструировано с помощью Streams или Promises одним из них, хотя, честно говоря, я думаю, что вы найдете Stream более гибким. Ключевым преимуществом является способность API, позволяющая объединять действия в цепочке зависимостей без блокировки. В качестве примера, приведенного в полном объеме, на основе вашего используемого вами варианта использования электронной почты:

@Autowired
Environment env;
@Autowired
SmtpClient client;

// Using a ThreadPoolDispatcher
Deferred<DomainObject, Stream<DomainObject>> input = Streams.defer(env, THREAD_POOL);

input.compose()
  .map(new Function<DomainObject, EmailTemplate>() {
    public EmailTemplate apply(DomainObject in) {
      // generate the email
      return new EmailTemplate(in);
    }
  })
  .consume(new Consumer<EmailTemplate>() {
    public void accept(EmailTemplate email) {
      // send the email
      client.send(email);
    }
  });

// Publish input into Deferred
DomainObject obj = reader.readNext();
if(null != obj) {
  input.accept(obj);
}

Reactor также предоставляет Boundary, который в основном представляет собой CountDownLatch для блокировки на произвольных потребителях (так что вам не нужно создавать a Promise, если все, что вы хотите сделать, это блок для завершения Consumer). Вы можете использовать raw Reactor в этом случае и использовать методы on() и notify() для запуска проверки состояния службы.

Для некоторых вещей, однако, кажется, что вы хотите, чтобы Future возвращался из ExecutorService, нет? Почему бы просто не упростить вещи? Реактор будет иметь реальную выгоду в ситуациях, когда важна производительность и производительность накладных расходов. Если вы блокируете вызывающий поток, то вы, вероятно, собираетесь уничтожить эффективные выгоды, которые Reactor даст вам в любом случае, поэтому вам может быть лучше в этом случае использовать более традиционный набор инструментов.

Хорошая вещь об открытости реактора заключается в том, что ничего не мешает им взаимодействовать. Вы можете свободно смешивать Futures с Consumers без статичности. В этом случае просто имейте в виду, что вы всегда будете такими же быстрыми, как ваш самый медленный компонент.

Ответ 2

Давайте проигнорируем Spring ApplicationEvent поскольку он действительно не предназначен для того, что вы просите (его больше об управлении жизненным циклом боба).

Что вам нужно выяснить, если вы хотите сделать это

  1. объектно-ориентированный путь (т.е. субъекты, динамические потребители, зарегистрированные на лету) ИЛИ
  2. путь обслуживания (статические потребители, зарегистрированные при запуске).

Используя ваш пример X и Y, они:

  1. эфемерные экземпляры (1), или они
  2. долгоживущие синглтоны/объекты обслуживания (2)?

Если вам нужно регистрировать потребителей "на лету", а Akka - хороший выбор (я не уверен в реакторе, поскольку я его никогда не использовал). Если вы не хотите использовать свое потребление в эфемерных объектах, вы можете использовать JMS или AMQP.

Вам также необходимо понять, что эти библиотеки пытаются решить две проблемы:

  1. Параллелизм (т.е. Параллельное выполнение на той же машине)
  2. Распространение (т.е. Параллельное выполнение на нескольких машинах)

Реактор и Акка в основном сосредоточены на №1. Акка недавно добавила поддержку кластера, и абстракция актера упростила работу # 2. Очереди сообщений (JMS, AMQP) сосредоточены на # 2.

Для моей собственной работы я выполняю маршрут обслуживания и использую сильно измененные Guava EventBus и RabbitMQ. Я использую аннотации, похожие на Guava Eventbus, но также имеет аннотации для объектов, отправленных на шину, однако вы можете просто использовать Guava EventBus в режиме Async в качестве POC, а затем сделать свой собственный, как я.

Вы можете подумать, что вам нужны динамические потребители (1), но большинство проблем можно решить с помощью простого паба/под. Кроме того, управление динамическими потребителями может быть сложным (следовательно, Akka - хороший выбор, потому что у модели актера есть все виды управления для этого)

Ответ 3

Тщательно определите, что вы хотите от рамки. Если структура имеет больше возможностей, чем вам нужно, это не всегда хорошо. Дополнительные функции означают больше ошибок, больше кода для изучения и меньшей производительности.

Некоторые функции, которые необходимо учитывать:

  • характер участников (нитей или легких объектов)
  • способность работать на кластере машин (Akka)
  • постоянные очереди сообщений (JMS)
  • специальные функции, такие как сигналы (события без информации), переходы (объекты для объединения сообщений из разных портов в сложные события, см. Petri Nets) и т.д.

Будьте осторожны с синхронными функциями, такими как "ждут" - он блокирует весь поток и опасен, когда актеры исполняются в пуле потоков (потоковое голодание).

Дополнительные рамки для просмотра:

Fork-Join Pool - в некоторых случаях позволяет await без голодания нить

Научные системы документооборота

Структура потока данных для Java - сигналы, переходы

ADD-ON: два вида актеров.

Как правило, параллельная рабочая система может быть представлена ​​в виде графика, где активные узлы отправляют сообщения друг другу. В Java, как и в большинстве других основных языков, активные узлы (исполнители) могут быть реализованы либо как потоки, либо задачи (Runnable или Callable), выполняемые пулом потоков. Обычно часть участников - это потоки, а часть - это задачи. Оба подхода имеют свои преимущества и недостатки, поэтому очень важно выбрать наиболее подходящую реализацию для каждого действующего лица в системе. Вкратце, потоки могут блокировать (и ждать событий), но потребляют много памяти для своих стеков. Задачи могут не блокироваться, а использовать общие стеки (потоков в пуле).

Если задача вызывает операцию блокировки, она исключает объединенный поток из службы. Если многие задачи блокируются, они могут исключать все потоки, вызывая тупик - те задачи, которые могут разблокировать заблокированные задачи, не могут выполняться. Такой тупик называется потоком голода. Если в попытке предотвратить головоломку потоков, настройте пул потоков как неограниченный, мы просто преобразуем задачи в потоки, теряя преимущества задач.

Чтобы устранить вызовы блокировки операций в задачах, задача должна быть разделена на две (или более) - первую задачу блокировки операций и выходы, а остальная часть отформатирована как асинхронная задача, запущенная при завершении операции блокировки. Конечно, операция блокировки должна иметь альтернативный асинхронный интерфейс. Так, например, вместо синхронного чтения сокета следует использовать библиотеки NIO или NIO2.

К сожалению, стандартной java-библиотеке не хватает асинхронных копий для популярных функций синхронизации, таких как очереди и семафоры. К счастью, их легко реализовать с нуля (см. Структура потока данных для Java).

Таким образом, вычисление чисто с неблокирующими задачами возможно, но увеличивает размер кода. Очевидно, советуем использовать потоки, где это возможно, и задачи только для простых массовых вычислений.