ThreadPoolExecutor с неограниченной очередью, не создающей новые потоки - программирование
Подтвердить что ты не робот

ThreadPoolExecutor с неограниченной очередью, не создающей новые потоки

My ThreadPoolExecutor не создает новые потоки. На самом деле я написал несколько хакерских LinkedBlockingQueue, которые будут принимать любую задачу (т.е. Она неограничена), но вызовет дополнительный обработчик, который в моем приложении выводит предупреждающую трассировку о том, что пул отстает - что дает мне очень ясную информацию о том, что TPE отказываясь создавать новые потоки, даже если в очереди есть тысячи записей. Мой конструктор выглядит следующим образом:

private final ExecutorService s3UploadPool = 
new ThreadPoolExecutor(1, 40, 1, TimeUnit.HOURS, unboundedLoggingQueue);

Почему это не создает новые потоки?

4b9b3361

Ответ 1

Эта ошибка описана в этом блоге:

Эта конструкция пула потоков просто не будет работать, как ожидалось. Это связано с логикой в ThreadPoolExecutor, где новые потоки добавляются, если не удается предложить задачу в очередь. В нашем случае мы используем неограниченную LinkedBlockingQueue, где мы всегда можем предложить задачу в очередь. Это фактически означает, что мы никогда не будем расти выше размера основного пула и до максимального размера пула.

Если вам также необходимо отделить минимум от максимальных размеров пула, вам придется выполнить некоторое расширенное кодирование. Я не знаю о решении, которое существует в библиотеках Java или Apache Commons. Решение состоит в том, чтобы создать связанный BlockingQueue который знает о TPE, и приложит все усилия, чтобы отклонить задачу, если он знает, что у TPE нет доступных потоков, а затем выполнить ручную обработку. Более подробно это описано в связанном посте. В конечном итоге ваша конструкция будет выглядеть так:

public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime) {
   ScalingQueue queue = new ScalingQueue();
   ThreadPoolExecutor executor =
      new ScalingThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue);
   executor.setRejectedExecutionHandler(new ForceQueuePolicy());
   queue.setThreadPoolExecutor(executor);
   return executor;
}

Однако проще установить для corePoolSize значение maxPoolSize и не беспокоиться об этой maxPoolSize.

Ответ 2

Как упоминалось в @djechlin, это часть (удивительного для многих) определенного поведения ThreadPoolExecutor. Я считаю, что я нашел несколько элегантное решение вокруг этого поведения, которое я покажу в своем ответе здесь:

Как заставить ThreadPoolExecutor увеличить потоки до max перед очередью?

В основном вы расширяете LinkedBlockingQueue, чтобы он всегда возвращал false для queue.offer(...), который при необходимости добавляет дополнительные потоки в пул. Если пул уже находится в максимальных потоках, и все они заняты, будет вызываться RejectedExecutionHandler. Это обработчик, который затем выполняет put(...) в очереди.

Смотрите мой код там.

Ответ 3

Существует обходной путь к этой проблеме. Рассмотрим следующую реализацию:

int corePoolSize = 40;
int maximumPoolSize = 40;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 
    60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
threadPoolExecutor.allowCoreThreadTimeOut(true);

Установив allowCoreThreadTimeOut() на true, потоки в пуле могут заканчиваться после указанного таймаута (60 секунд в этот пример). С помощью этого решения конструктор конструктора corePoolSize определяет размер максимального пула на практике, поскольку пул потоков будет расти до corePoolSize, а затем начнет добавлять задания в очередь. Вероятно, пул может никогда не расти больше, потому что пул не будет порождать новые потоки, пока очередь не будет заполнена (что, учитывая, что LinkedBlockingQueue имеет емкость Integer.MAX_VALUE, никогда не может произойти). Следовательно, нет смысла устанавливать maximumPoolSize на большее значение, чем corePoolSize.

Рассмотрение: пул потоков имеет 0 холостых потоков после истечения времени ожидания, а это означает, что перед созданием потоков будет некоторая латентность (обычно у вас всегда есть потоки corePoolSize).

Более подробную информацию можно найти в JavaDoc ThreadPoolExecutor.