Подтвердить что ты не робот

Пул потоков, который связывает задачи для данного идентификатора с тем же потоком

Существуют ли какие-либо реализации пула потоков (в Java), которые обеспечивают выполнение всех задач для одного и того же логического идентификатора в одном и том же потоке?

Логика, за которой я пришел, - это если в конкретном потоке для данного логического идентификатора уже выполняется задача, тогда новые задачи с тем же идентификатором запланированы в том же потоке. Если нет потоков, выполняющих задачу для одного и того же идентификатора, тогда можно использовать любой поток.

Это позволит выполнять параллельные задачи для несвязанных идентификаторов, но задачи для того же ID, которые будут выполняться последовательно и в представленном порядке.

Если нет, есть ли предложения о том, как я могу расширить ThreadPoolExecutor, чтобы получить это поведение (если это возможно)?

UPDATE

Проведя больше размышлений об этом, я фактически не требую, чтобы задачи для одного и того же логического идентификатора выполнялись в одном потоке, просто они не выполнялись одновременно.

Примером для этого может служить система, которая обрабатывала заказы для клиентов, где было нормально обрабатывать несколько заказов одновременно, но не для одного и того же клиента (и все заказы для одного и того же клиента должны были обрабатываться в порядке).

Подход, который я принимаю на данный момент, - это использовать стандартный ThreadPoolExecutor с настраиваемым BlockingQueue, а также обернуть Runnable с помощью специальной оболочки. Логика оболочки Runnable:

  • Атомно попытайтесь добавить идентификатор к параллельному "запущенному" набору (ConcurrentHashMap), чтобы увидеть, запущена ли задача для того же ID
    • Если сбой добавления, нажмите задачу обратно на переднюю часть очереди и немедленно верните
    • если выполняется, продолжайте
  • Запустить задачу
  • Удалите связанный с задачей идентификатор из набора "running"

Методы очереди poll() затем возвращают только те задачи, у которых есть идентификатор, который в настоящее время не находится в "запущенном" наборе.

Проблема заключается в том, что я уверен, что будет много угловых случаев, о которых я не думал, поэтому потребуется много испытаний.

4b9b3361

Ответ 1

Создайте массив сервисов-исполнителей, выполняющих по одному потоку, и присвойте им записи в очередь хеш-кодом вашего идентификатора элемента. Массив может иметь любой размер, в зависимости от того, сколько потоков вы хотите использовать.

Это ограничит использование службы-исполнителя, но позволяет использовать ее возможность отключать единственный поток, если он больше не нужен (с allowCoreThreadTimeOut(true)) и перезапускать его по мере необходимости. Кроме того, все вещи в очереди будут работать без перезаписи.

Ответ 2

Простейшей идеей может быть следующее:

Имейте фиксированную карту BlockingQueue s. Используйте механизм хэша для выбора очереди на основе идентификатора задачи. Хэш-алгоритм должен выбрать одну и ту же очередь для тех же идентификаторов. Начните один поток для каждой очереди. каждый поток выберет одну задачу из собственной выделенной очереди и выполнит ее.

p.s. соответствующее решение сильно зависит от типа работы, которую вы назначаете для потоков

UPDATE

Хорошо, как насчет этой сумасшедшей идеи, пожалуйста, медведь со мной:)

Скажем, мы имеем ConcurrentHashMap, который содержит ссылки id -> OrderQueue

ID1->Q1, ID2->Q2, ID3->Q3, ...

Смысл, что теперь каждый id связан с собственной собственной очередью. OrderQueue - это настраиваемая блокирующая очередь с дополнительным булевым флагом - isAssociatedWithWorkingThread.

Существует также регулярный BlockingQueue, который мы будем называть amortizationQueue, теперь вы увидите, что он будет использоваться позже.

Затем мы имеем N рабочие потоки. Каждый рабочий поток имеет свою собственную рабочую очередь, которая является BlockingQueue, содержащей идентификаторы, связанные с этим потоком.

Когда приходит новый идентификатор, мы делаем следующее:

create a new OrderQueue(isAssociatedWithWorkingThread=false)
put the task to the queue
put id->OrderQueue to the map
put this OrderQueue to amortizationQueue

Когда приходит обновление для существующего идентификатора, мы делаем следующее:

pick OrderQueue from the map
put the task to the queue
if isAssociatedWithWorkingThread == false
    put this OrderQueue to amortizationQueue

Каждый рабочий поток выполняет следующие действия:

take next id from the working queue
take the OrderQueue associated with this id from the map
take all tasks from this queue
execute them
mark isAssociatedWithWorkingThread=false for this OrderQueue
put this OrderQueue to amortizationQueue

Довольно просто. Теперь к забавной части - кражу работы:)

Если в некоторый момент времени какой-то рабочий поток оказывается с пустой рабочей очередью, то он делает следующее:

go to the pool of all working threads
pick one (say, one with the longest working queue)
steal id from *the tail* of that thread working queue
put this id to it own working queue
continue with regular execution

И еще +1 дополнительный поток, который обеспечивает работу по амортизации:

while (true)
    take next OrderQueue from amortizationQueue
    if queue is not empty and isAssociatedWithWorkingThread == false
         set isAssociatedWithWorkingThread=true
         pick any working thread and add the id to it working queue

Придется потратить больше времени на размышления, если вы можете уйти с флагом AtomicBoolean для isAssociatedWithWorkingThread, или есть необходимость сделать операцию блокировки для проверки/изменения этого флага.

Ответ 3

Мне приходилось иметь дело с подобной ситуацией в последнее время.

В итоге у меня был дизайн, похожий на ваш. Единственное различие заключалось в том, что "текущий" был скорее картой, а не набором: картой от ID до очереди Runnables. Когда оболочка вокруг задачи runnable видит, что ее идентификатор присутствует на карте, он добавляет задачу, запускаемую в очередь идентификаторов, и немедленно возвращается. В противном случае идентификатор добавляется на карту с пустой очередью и выполняется задача.

Когда задача выполнена, обертка снова проверяет очередь идентификаторов. Если очередь не пуста, выбирается runnable. В противном случае он удаляется с карты, и мы закончили.

Я останусь выключением и отменой в качестве упражнения для читателя:)

Ответ 4

Наш подход подобен тому, что находится в обновлении исходного вопроса. У нас есть класс-оболочка, который является runnable, который содержит очередь (LinkedTransferQueue), которую мы называем RunnableQueue. В runnable queue есть базовый API:

public class RunnableQueue implements Runnable
{
  public RunnableQueue(String name, Executor executor);
  public void run();

  public void execute(Runnable runnable);
}

Когда пользователь отправляет первый Runnable по вызову выполнения, RunnableQueue завершает себя в исполнителе. Последующие вызовы для выполнения получаются в очереди в очереди RunnableQueue. Когда runnable queue получает исполняемый ThreadPool (через его метод запуска), он начинает "истощать" внутреннюю очередь, последовательно выполняя исполняемые файлы один за другим. Если во время выполнения вызывается выполнение во RunnableQueue, новые runnables просто добавляются во внутреннюю очередь. После того, как очередь опущена, метод run runable queue завершается, и он "оставляет" пул исполнителей. Повторное полоскание.

У нас есть другие оптимизации, которые делают такие вещи, как только позволяет запустить некоторое количество runnables (например, четыре), прежде чем RunnableQueue снова запустится в пул исполнителей.

Единственный действительно сложный бит внутри, и это не так сложно) заключается в синхронизации вокруг, когда он отправляется исполнителю или нет, чтобы он не возвращался или не пропускал, когда он должен публиковать сообщения.

В целом мы находим, что это работает очень хорошо. "ID" (семантический контекст) для нас - это runnable queue. Необходимость, которую мы имеем (т.е. Плагин), имеет ссылку на RunnableQueue, а не на пул исполнителей, поэтому он вынужден работать исключительно через RunnableQueue. Это не только гарантирует, что все обращения являются последовательной последовательностью (ограничение потока), но позволяет RunnableQueue "умеренно" загружать загрузку плагина. Кроме того, для этого не требуется централизованная структура управления или другие споры.

Ответ 5

Расширение ThreadPoolExecutor было бы довольно сложным. Я бы предложил вам пойти на систему производителей-потребителей. Вот что я предлагаю.

  • Вы можете создавать типичные потребительские системы производителей. Ознакомьтесь с кодом, указанным в этом вопросе.
  • Теперь каждая из этих систем будет иметь очередь и поток Один потребитель, который будет последовательно обрабатывать задачи в очереди
  • Теперь создайте пул таких отдельных систем.
  • Когда вы отправляете задание для связанного идентификатора, смотрите, есть ли система, помеченная для этого связанного идентификатора, который в настоящее время обрабатывает задачи, если да, то отправьте задачи,
  • Если не обрабатывать какие-либо задачи, тогда отметьте эту систему с этим новым связанным идентификатором и отправьте задачу.
  • Таким образом, одна система будет обслуживать только один логический идентификатор.

Здесь я предполагаю, что связанный идентификатор - это логический набор отдельных идентификаторов, а созданные потребительские системы будут созданы для связанных идентификаторов и НЕ индивидуальных идентификаторов.