Как использовать прерыватель с несколькими типами сообщений - программирование
Подтвердить что ты не робот

Как использовать прерыватель с несколькими типами сообщений

Моя система имеет два разных типа сообщений - тип A и B. Каждое сообщение имеет другую структуру: тип A содержит член int, а тип B содержит двойной член. Моя система должна передавать оба типа сообщений для многочисленных потоков бизнес-логики. Уменьшение задержки очень важно, поэтому я расследую использование Disruptor для передачи сообщений из основного потока в потоки бизнес-логики с механической симпатией.

Моя проблема заключается в том, что разрушитель принимает только один тип объекта в кольцевом буфере. Это имеет смысл, потому что прерыватель предварительно выделяет объекты в кольцевом буфере. Однако это также затрудняет передачу двух разных типов сообщений в потоки бизнес-логики через Disruptor. Из того, что я могу сказать, у меня есть четыре варианта:

  • Настройте разрушитель на использование объектов, содержащих массив байтов с фиксированным размером (как рекомендовано Как следует использовать Disruptor (Pattern Disruptor) для сборки реальные системы сообщений?). В этом случае основной поток должен кодировать сообщения в массивы байтов, прежде чем публиковать их в прерыватель, и каждый из потоков бизнес-логики должен декодировать массивы байтов обратно в объекты при получении. Недостатком этой настройки является то, что потоки бизнес-логики по-настоящему не разделяют память от разрушителя - вместо этого они создают новые объекты (и, таким образом, создают мусор) из массива байтов, предоставляемого разрушителем. Поверхность этой установки заключается в том, что все потоки бизнес-логики могут считывать несколько разных типов сообщений от одного и того же прерывателя.

  • Настройте разрушитель на использование одного типа объектов, но создайте несколько разрывов, по одному для каждого типа объекта. В приведенном выше случае были бы два отдельных разрушителя: один для объектов типа A и другой для объектов типа B. Поверхность этой установки заключается в том, что основной поток не должен кодировать объект в массив байтов, а бизнес-логические потоки могут использовать те же объекты, что и в разрушителе (без мусора). Недостатком этой настройки является то, что каким-то образом каждый поток бизнес-логики должен будет подписаться на сообщения от нескольких разрывов.

  • Сконфигурируйте разрушитель для использования одного типа "супер" объекта, который содержит все поля обоих сообщений A и B. Это очень против стиля OO, но позволит компромисс между опциями №1 и №2.

  • Настройте разладчик на использование ссылок на объекты. Тем не менее, в этом случае я теряю преимущества производительности предварительного распределения объектов и порядка памяти.

Что вы рекомендуете для этой ситуации? Я считаю, что вариант №2 - самое чистое решение, но я не знаю, могут ли пользователи технически подписываться на сообщения от нескольких разрывов. Если кто-то может предоставить пример того, как реализовать вариант № 2, это было бы очень полезно!

4b9b3361

Ответ 1

Настройте искатель на использование объектов, содержащих массив байтов фиксированного размера (как рекомендовано как следует использовать Disruptor (Pattern Disruptor) для сборки реальные системы сообщений?). В этом случае основной поток должен кодировать сообщения в байтовые массивы, прежде чем публиковать их в разрыве и каждый потоков бизнес-логики должны декодировать байтовые массивы обратно в объекты по получении. Недостатком этой настройки является то, что потоки бизнес-логики на самом деле не разделяют память от разрушителя - вместо этого они создание новых объектов (и, таким образом, создание мусора) из массива байтов предоставляемый разрушителем. Поверхность этой установки заключается в том, что все деловые логические потоки могут читать несколько разных типов сообщений от одного и того же разрушитель.

Это был бы мой предпочтительный подход, но я слегка покрасил наши случаев использования, примерно в каждом месте, которое мы использовали Disruptor it либо получать, либо отправлять какие-либо устройства ввода-вывода, поэтому наши Базовая валюта - это байтовые массивы. Вы можете обойти создание объекта используя мухи подход к сортировке. Чтобы увидеть пример это, я использовал классы Javolution Struct и Union в примере, который Я представил в Devoxx (https://github.com/mikeb01/ticketing). если ты может полностью справиться с объектом перед возвратом из onEvent вызов из обработчика событий, этот подход работает хорошо. Если событие должно жить дальше этого момента, тогда вам нужно сделать что-то вроде копии данных, например. де-сериализации его в объект.

Настройте разрушитель на использование одного типа объектов, но создайте несколько разрывы, по одному для каждого типа объекта. В приведенном выше случае два отдельных разрушителя - один для объектов типа A и другой для объектов типа B. Поверхность этой установки заключается в том, что основной поток не должен закодировать объект в байтовый массив, а бизнес-логические потоки могут делить те же объекты, что и в разрушителе (без мусора). недостатком этой настройки является то, что как-то каждый поток бизнес-логики будет иметь для подписки на сообщения от нескольких разрывов.

Не пробовал этот подход, вам, вероятно, понадобится настраиваемый EventProcessor который может опросить несколько буферных буферов.

Настройте разладчик на использование одного типа "супер" объекта, который содержит все поля обоих сообщений A и B. Это очень против стиля OO, но будет разрешить компромисс между опциями №1 и №2. Настройте разладчик на использование ссылок на объекты. Однако в этом случае я теряют преимущества в производительности предварительного распределения объектов и упорядочивания памяти.

Мы сделали это в нескольких случаях, когда в некоторых случаях, когда превалирование допустимо. Все работает нормально. Если вы проходите объекты, то вам нужно убедиться, что вы их обнулите, как только вы закончил с ними на стороне потребителя. Мы обнаружили, что использование двойной шаблон отправки для "супер" объекта сохранял реализацию чистый. Один из недостатков этого заключается в том, что он будет немного длиннее GC останавливается, что с чем-то, что было прямым массивом объектов, как GC имеет больше живых объектов для прохождения во время фазы метки.

Что вы рекомендуете для этой ситуации? Я считаю, что вариант № 2 - это чистое решение, но я не знаю, могут ли или как потребители технически подписываться на сообщения от нескольких разрывов. Если кто-либо может предоставить пример того, как реализовать вариант № 2, было бы очень признательно!

Другой вариант, если вы хотите получить полную гибкость в отношении использование данных, заключается в том, чтобы не использовать кольцевой буфер, а вместо этого говорить напрямую в Sequencer и определите свой макет объекта, как вы лучше всего увидите фитинг.

Ответ 2

Бен Баумголд, я уверен, что вы нашли решение к настоящему времени. Ваш №4 (или # 3) можно реализовать тривиально, создав держатель события. Подумайте об этом как о перечислении объектов. Чтобы ускорить поиск, события должны быть обогащены типом перечисления. Обратите внимание: я сохраняю ссылку на оригинальное событие в держателе. Возможно, более целесообразно создать конструктор копирования или clone() и скопировать события при вставке в кольцевой буфер.

Иллюстрируя пример:

//это перечисление, используемое в событиях

public enum MyEventEnum {
EVENT_TIMER,
EVENT_MARKETDATA;
}

//это владелец. В любой момент этот экземпляр в ringbuffer содержит только одно событие, индексированное массивом [type.ordinal()]. почему массив должен быть очевиден из кода.

public class RingBufferEventHolder {    
 private MyEventEnum;   
 private EventBase array[];

 public RingBufferEventHolder() {
    array=new EventBase[MyEventEnum.values().length]; 
 }

 // TODO: null the rest
 public void setEvent(EventBase event) {
    type=event.getType();
    switch( event.getType() ) {
        case EVENT_TIMER:
            array[MyEventEnum.EVENT_TIMER.ordinal()]=event;
            break;
        case EVENT_MARKETDATA:
            array[MyEventEnum.EVENT_MARKETDATA.ordinal()]=event;
            break;
        default:
            throw new RuntimeException("Unknown event type " + event );
    }
}

//публиковать событие

   EventBase newEvent=new EventMarketData(....);
   // prepare
   long nextSequence = ringBuffer.next(); 
   RingBufferEventHolder holder = ringBuffer.get(nextSequence);
   holder.setEvent(newEvent);
   // make the event available to EventProcessors 
   ringBuffer.publish(nextSequence);