Подтвердить что ты не робот

Невозможно создать кэшированный пул потоков с ограничением размера?

Кажется, невозможно создать кэшированный пул потоков с ограничением количества потоков, которые он может создать.

Вот как статический Executors.newCachedThreadPool реализован в стандартной библиотеке Java:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Итак, используя этот шаблон для создания пула кешированных файлов с фиксированным размером:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Теперь, если вы используете это и отправляете 3 задания, все будет хорошо. Отправка любых дальнейших задач приведет к отклоненным исключениям выполнения.

Попытка:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

Приведёт к тому, что все потоки будут выполняться последовательно. Например, пул потоков никогда не будет создавать более одного потока для выполнения ваших задач.

Это ошибка в методе выполнения ThreadPoolExecutor? Или, может быть, это намеренно? Или есть другой способ?

Изменить: я хочу что-то точно такое, как пул кешированных потоков (он создает потоки по требованию, а затем убивает их после некоторого таймаута), но с ограничением количества потоков, которые он может создать, и возможностью продолжать очереди дополнительных задач как только он достигнет предела потока. Согласно sjlee-ответу это невозможно. Глядя на метод execute() ThreadPoolExecutor, это действительно невозможно. Мне нужно было бы подклассировать ThreadPoolExecutor и переопределить execute(), как это делает SwingWorker, но то, что SwingWorker делает в своем execute(), является полным взломом.

4b9b3361

Ответ 1

ThreadPoolExecutor имеет следующие несколько ключевых вариантов поведения, и эти проблемы могут быть объяснены вашими проблемами.

Когда задачи отправлены,

  1. Если пул потоков не достиг базового размера, он создает новые потоки.
  2. Если достигнут размер ядра и нет свободных потоков, он ставит задачи в очередь.
  3. Если достигнут размер ядра, нет свободных потоков, и очередь заполняется, создаются новые потоки (до достижения максимального размера).
  4. Если достигнут максимальный размер, неиспользуемые потоки отсутствуют, и очередь заполняется, включается политика отклонения.

В первом примере обратите внимание, что SynchronousQueue по существу имеет размер 0. Поэтому, как только вы достигнете максимального размера (3), политика отклонения вступит в действие (# 4).

Во втором примере предпочтительной очередью является LinkedBlockingQueue, размер которой неограничен. Таким образом, вы застряли с поведением № 2.

Вы не можете сильно повозиться с кэшированным типом или фиксированным типом, так как их поведение почти полностью определено.

Если вы хотите иметь ограниченный и динамический пул потоков, вам нужно использовать положительный размер ядра и максимальный размер в сочетании с конечным размером очереди. Например,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Приложение: это довольно старый ответ, и похоже, что JDK изменил свое поведение, когда дело доходит до размера ядра 0. После JDK 1.6, если размер ядра равен 0 и пул не имеет потоков, ThreadPoolExecutor добавит поток для выполнения этой задачи. Таким образом, размер ядра 0 является исключением из правила выше. Спасибо Стиву за внимание моего внимания.

Ответ 2

Если я что-то пропустил, решение первоначального вопроса прост. Следующий код реализует желаемое поведение, как описано в оригинальном плакате. Он будет содержать до 5 потоков для работы в неограниченной очереди, а незанятые потоки прекратятся через 60 секунд.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

Ответ 3

Была такая же проблема. Поскольку никакой другой ответ не ставит все проблемы вместе, я добавляю свой:

Теперь это ясно написано в docs: если вы используете очередь, которая не блокирует (LinkedBlockingQueue), то максимальная настройка нитей не влияет, только ядро используются потоки.

так:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

Этот исполнитель имеет:

  • Нет концепции максимальных потоков, поскольку мы используем неограниченную очередь. Это хорошо, потому что такая очередь может заставить исполнителя создавать огромное количество непрофильных дополнительных потоков, если оно следует обычной политике.

  • Очередь максимального размера Integer.MAX_VALUE. Submit() будет бросать RejectedExecutionException, если количество ожидающих задач превышает Integer.MAX_VALUE. Не уверен, что сначала у нас закончится память, или это произойдет.

  • Имеет 4 основных потока. Простаивающие потоки ядра автоматически выходят, если они не работают в течение 5 секунд. Поэтому да, строго по требованию. Количество может быть изменено с помощью метода setThreads().

  • Уверен, что минимальное число основных потоков не меньше единицы, иначе Submit() отклонит каждую задачу. Поскольку основные потоки должны быть >= максимальные потоки, метод setThreads() также устанавливает максимальные потоки, хотя максимальная установка потока бесполезна для неограниченной очереди.

Ответ 4

В первом примере последующие задачи отклоняются, поскольку AbortPolicy является значением по умолчанию RejectedExecutionHandler. ThreadPoolExecutor содержит следующие политики, которые вы можете изменить с помощью метода setRejectedExecutionHandler:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

Похоже, вы хотите использовать кешированный пул потоков с помощью CallerRunsPolicy.

Ответ 5

Ни один из ответов здесь не исправил мою проблему, связанную с созданием ограниченного количества HTTP-соединений с использованием HTTP-клиента Apache (версия 3.x). Поскольку мне потребовалось несколько часов, чтобы выяснить, какая хорошая настройка, я поделюсь:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

Это создает ThreadPoolExecutor, который начинается с пяти и содержит максимум десять одновременных потоков, используя CallerRunsPolicy для выполнения.

Ответ 6

В Javadoc для ThreadPoolExecutor:

Если существует больше, чем corePoolSize, но меньше потоков maximumPoolSize, новый поток будет создан только в том случае, если очередь заполнена. Установив corePoolSize и maximumPoolSize то же самое, вы создаете пул потоков фиксированного размера.

(Подчеркните мой.)

Ответ на дрожание - это то, что вы хотите, хотя мой ответ на ваш другой вопрос.:)

Ответ 7

Это то, что вы хотите (по крайней мере, я так думаю). Для пояснения проверьте ответ Джонатана Фейнберга

Executors.newFixedThreadPool(int n)

Создает пул потоков, который повторно использует фиксированное количество потоков, работающих с общей неограниченной очередью. В любой момент, в большинстве случаев nThreads будут активными задачами обработки. Если дополнительные задачи передаются, когда все потоки активны, они будут ждать в очереди до тех пор, пока поток не будет доступен. Если какой-либо поток завершается из-за сбоя во время выполнения перед завершением работы, новый, если потребуется, займет свое место для выполнения последующих задач. Нити в пуле будут существовать до тех пор, пока они не будут явно отключены.

Ответ 8

есть еще один вариант. Вместо использования нового SynchronousQueue вы также можете использовать любую другую очередь, но вы должны убедиться, что ее размер равен 1, так что это заставит службу-исполнитель создать новый поток.

Ответ 9

Не похоже, что какой-либо из ответов действительно отвечает на вопрос - на самом деле я не вижу способа сделать это, даже если вы являетесь подклассом из PooledExecutorService, поскольку многие из методов/свойств являются частными, например. делая защиту addIfUnderMaximumPoolSize, вы можете сделать следующее:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

Ближайший я получил это - но даже это не очень хорошее решение

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

p.s. не проверено выше

Ответ 10

Вот еще одно решение. Я думаю, что это решение ведет себя так, как вы хотите (хотя и не гордитесь этим решением):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);

Ответ 11

  • Вы можете использовать ThreadPoolExecutor как предложено @sjlee

    Вы можете управлять размером пула динамически. Посмотрите на этот вопрос для более подробной информации:

    Динамический пул потоков

    ИЛИ

  • Вы можете использовать newWorkStealingPool API, который был введен с помощью java 8.

    public static ExecutorService newWorkStealingPool()
    

    Создает пул потоков обработки, используя все доступные процессоры в качестве целевого уровня parallelism.

По умолчанию уровень parallelism установлен на количество ядер процессора на вашем сервере. Если у вас есть 4 основных сервера ЦП, размер пула потоков будет 4. Этот API возвращает тип ForkJoinPool ExecutorService и позволяет краже работы бездействующих потоков путем кражи задач из занятых потоков в ForkJoinPool.

Ответ 12

Проблема была обобщена следующим образом:

Я хочу что-то в точности как кешированный пул потоков (он создает потоки по требованию, а затем убивает их по истечении некоторого времени ожидания), но с ограничением на количество потоков, которые он может создать, и возможностью продолжать ставить в очередь дополнительные задачи после того, как он достигнет его. ограничение потока.

Прежде чем указывать на решение, я объясню, почему не работают следующие решения:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

Это не будет ставить в очередь никакие задачи при достижении предела 3, потому что SynchronousQueue по определению не может содержать никаких элементов.

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

Это не создаст более одного потока, потому что ThreadPoolExecutor создает потоки, превышающие corePoolSize, только если очередь заполнена. Но LinkedBlockingQueue никогда не бывает полным.

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

Это не будет повторно использовать потоки, пока не будет достигнут corePoolSize, потому что ThreadPoolExecutor увеличивает количество потоков до тех пор, пока не будет достигнут corePoolSize, даже если существующие потоки простаивают. Если вы можете жить с этим недостатком, то это самое простое решение проблемы. Это также решение, описанное в разделе "Параллелизм Java на практике" (сноска на стр. 172).

Единственное полное решение описанной проблемы, по-видимому, заключается в переопределении метода очереди offer и написании RejectedExecutionHandler, как объяснено в ответах на этот вопрос: Как получить ThreadPoolExecutor, чтобы увеличить потоки до максимального значения до очереди?