Подтвердить что ты не робот

ExecutorService, стандартный способ избежать слишком большой очереди задач

Я использую ExecutorService для удобства параллельной многопоточной программы. Возьмите следующий код:


while(xxx)
 ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
 ...  
 Future<..> ... = exService.submit(..);
 ...
}

В моем случае проблема заключается в том, что submit() не блокируется, если заняты все NUMBER_THREADS. Следствием этого является то, что очередь задач наводняется многими задачами. Следствием этого является то, что закрытие службы выполнения с помощью ExecutorService.shutdown() занимает много времени (ExecutorService.isTerminated() будет ложным в течение длительного времени). Причина в том, что очередь задач все еще достаточно полная.

В настоящее время моим обходным решением является работа с семафорами, чтобы запретить иметь много записей внутри очереди задач ExecutorService:


...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);

while(xxx)
 ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
 ...
 semaphore.aquire();  
 // internally the task calls a finish callback, which invokes semaphore.release()
 // -> now another task is added to queue
 Future<..> ... = exService.submit(..); 
 ...
}

Я уверен, что есть лучшее более инкапсулированное решение?

4b9b3361

Ответ 1

Хитрость заключается в использовании фиксированного размера очереди и:

new ThreadPoolExecutor.CallerRunsPolicy()

Я также рекомендую использовать Guava ListeningExecutorService. Вот пример очередей потребителей/производителей.

private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  5000L, TimeUnit.MILLISECONDS,
                                  new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}

Что-нибудь лучше, и вы можете рассмотреть MQ, например, RabbitMQ или ActiveMQ, поскольку у них есть технология QoS.

Ответ 2

Вы можете ThreadPoolExecutor.getQueue(). size(), чтобы узнать размер очереди ожидания. Вы можете выполнить действие, если очередь слишком длинная. Я предлагаю запустить задачу в текущем потоке, если очередь слишком длинная, чтобы замедлить работу производителя (если это подходит)

Ответ 3

Вам лучше создать ThreadPoolExecutor (что и делает Executors.newXXX()).

В конструкторе вы можете передать BlockingQueue для Исполнителя для использования в качестве своей очереди задач. Если вы передадите размер BlockingQueue с ограничением размера (например, LinkedBlockingQueue), он должен достичь желаемого эффекта.

ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize));

Ответ 4

Истинная блокировка ThreadPoolExecutor была в списке желаний многих, там даже ошибка JDC открылась на нем. Я столкнулся с той же проблемой и наткнулся на это: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

Это реализация BlockingThreadPoolExecutor, реализованная с использованием RejectionPolicy, которая использует предложение для добавления задачи в очередь, ожидая, что очередь будет иметь место. Выглядит хорошо.

Ответ 5

вы можете добавить еще одну бляковую очередь с ограниченным размером, чтобы контролировать размер внутренней очереди в executorService, некоторые думают как семафор, но очень легко. перед исполнителем вы ставите(), и когда задается задача take(). take() должен находиться внутри кода задачи