Подтвердить что ты не робот

Одновременная обработка сообщений в строгом порядке

В моем веб-приложении JavaEE мне нужно обрабатывать входящие сообщения строго в порядке прибытия. Я предполагаю, что мой контейнер webapp (Tomcat 6) сохраняет порядок сообщений по мере их поступления в порт http.

Что вызывает у меня головные боли, так это то, как я внутренне обрабатываю эти сообщения. Для улучшения рабочей нагрузки я добавляю обработку каждого сообщения в ThreadPool, так как многое нужно сделать здесь, например. Разбор XML, иногда обогащение данных с использованием внешних веб-сервисов. По завершении обработки я выдвигаю java-представление сообщения в сложный механизм обработки потока esper.codehaus.org, который является потокобезопасным. Здесь проверяются разные шаблоны, где порядок входа является самым высоким требованием, например, порог явления превышает.

У меня возникла идея вставить каждое обработанное сообщение в PriorityQueue с идентификатором приоритета, который они получили во время прибытия (в моем сервлете, где он увеличивается для каждого сообщения). Проблема заключается в следующем:

Поток, который проверяет элементы из очереди (самый низкий идентификатор - это глава очереди), чтобы вставить его в esper, можно пропустить идентификатор, поскольку он не проверяет отсутствующие элементы. Я предполагаю, что иллюстрация работает лучше:

enter image description here

Для шагов (1) - (4) все работает по назначению. Но на этапе (5) QueuePoller извлекает элемент 6, а не элемент 4 (который позже вставлен на этапе (6)). Это приводит к порядку сообщения: 2; 3; 6; 4.

То, что я пытался сделать, это изменить реализацию опроса головы очереди, чтобы выполнить строгий порядок идентификаторов. Смысл, если следующий идентификационный элемент еще не вставлен в очередь, подождите до барьера до его появления. Казалось, что это работает в течение первых 10 минут, но затем повешено, возможно, из-за элемента, который никогда не вставлен в очередь.

У кого-то есть аналогичная проблема в прошлом и есть какой-то намек на меня?

4b9b3361

Ответ 1

Ознакомьтесь с Disruptor - очередь высокая производительность при строгом порядке (сначала введите - первым обслужен)

Ответ 2

Вы можете сразу добавить местозаполнитель для входящих запросов в очередь обработки. Заполнитель предварительно обрабатывается пулом потоков, но основная обработка ожидает завершения предварительной обработки. Конструкция, которую я имею в виду, это Future.

Ответ 3

Библиотека классов обеспечивает гибкую реализацию пула потоков вместе с некоторыми полезными предопределенными конфигурациями. Вы можете создать пул потоков, вызвав один из статических методов factory в Executors:

Для ваших нужд я думаю, что Executors.newSingleThreadExecutor() лучше. Однопоточный исполнитель создает один рабочий поток для обработки задач, заменяя его, если он неожиданно умирает. Гарантируется, что задачи будут обрабатываться последовательно в соответствии с порядком, заданным в очереди задач (FIFO, LIFO, приоритетный порядок).

Ответ 4

Как свидетельствует ваша проблема и необходимость диаграммы (+1 для этого, кстати), приоритетная очередь не является хорошей конструкцией для того, что вы хотите. Это связано с тем, что очередь с удовольствием служит вам доступным 6, а не ждать недоступного 4.

Я думаю, что пришло время запустить собственный синхронизированный контейнер.