Подтвердить что ты не робот

Размер основного пула и максимальный размер пула в ThreadPoolExecutor

В чем разница между размером основного пула и максимальным размером пула, когда мы говорим с точки зрения ThreadPoolExecutor? Можно ли это объяснить с помощью примера?

4b9b3361

Ответ 1

От этот пост в блоге:

Возьмите этот пример. Начальный размер пула потоков равен 1, размер пула ядра 5, максимальный размер пула равен 10, а очередь равна 100.

По мере поступления запросов, потоки будут созданы до 5, а затем задачи будут добавлены в очереди, пока не достигнет 100. Когда очередь будет заполнена, новые потоки будут созданный до maxPoolSize. После использования всех потоков и очередь - полные задачи, будут отклонены. Поскольку очередь уменьшается, количество активных потоков.

Ответ 2

ЕСЛИ запущенные темы> corePoolSize && Lt; maxPoolSize, а затем создайте новый поток, если очередь задач Total заполнена и приходит новая.

Форма документа: (Если запущено более corePoolSize, но менее MaximumPoolSize, новый поток будет создан, только если очередь заполнена.)

Теперь возьмите простой пример,

ThreadPoolExecutor executorPool = new ThreadPoolExecutor(5, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));

Здесь 5 - это corePoolSize - означает, что Jvm создаст новый поток для новой задачи для первых 5 задач. и другие задачи будут добавляться в очередь до тех пор, пока очередь не заполнится (50 задач).

10 - это maxPoolSize - JVM может создать до 10 потоков. Означает, что уже запущено 5 задач/потоков и очередь заполнена 50 ожидающими задачами, и если в очередь поступает еще один новый запрос/задача, JVM создаст новый поток до 10 (общее количество потоков = предыдущие 5 + новые 5) ;

new ArrayBlockingQueue (50)= - это общий размер очереди - он может поставить в очередь 50 задач.

как только все 10 потоков будут запущены, и если поступит новая задача, эта новая задача будет отклонена.

Правила внутреннего создания потоков с помощью SUN:

  1. Если число потоков меньше, чем corePoolSize, создайте новый поток, чтобы запустить новую задачу.

  2. Если количество потоков равно (или больше) corePoolSize, поместите задачу в очередь.

  3. Если очередь заполнена, а число потоков меньше, чем maxPoolSize, создайте новый поток для запуска задач.

  4. Если очередь заполнена, а число потоков больше или равно maxPoolSize, отклоните задачу.

Надеюсь, это HelpFul.. и, пожалуйста, поправьте меня, если я ошибаюсь...

Ответ 3

Из документа:

Когда новая задача отправляется при выполнении метода (java.lang.Runnable), и меньше, чем потоки corePoolSize, выполняется новый поток созданный для обработки запроса, даже если другие рабочие потоки простаивают. Если их больше, чем corePoolSize, но меньше, чем maximumPoolSize потоки выполняются, новый поток будет создан только в том случае, если очередь полный.

Далее

Установив corePoolSize и maximumPoolSize то же самое, вы создаете пул потоков фиксированного размера. Установив maxPoolSize по существу неограниченное значение, такое как Integer.MAX_VALUE, вы разрешаете пулу допускает произвольное количество одновременных задач. Как правило, ядро и максимальные размеры бассейна устанавливаются только при строительстве, но они также можно динамически изменять с помощью setCorePoolSize (int) и setMaximumPoolSize (целое).

Ответ 4

Если вы решили создать ThreadPoolExecutor вручную вместо использования класса Executors factory, вам нужно будет создать и настроить его с помощью одного из его конструкторов. Самый обширный конструктор этого класса:

public ThreadPoolExecutor(
    int corePoolSize,
    int maxPoolSize,
    long keepAlive,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    RejectedExecutionHandler handler
);

Как вы можете видеть, вы можете настроить:

  • Размер основного пула (размер пула потоков будет пытаться придерживаться).
  • Максимальный размер пула.
  • Время сохранения, которое является временем, после которого простаивающий поток может быть разорван.
  • Рабочая очередь для выполнения задач, ожидающих выполнения.
  • Политика, применяемая при отклонении представления задачи.

Ограничение количества заданий в очереди

Ограничение количества выполняемых параллельных задач, определение размера пула потоков, представляет собой огромную выгоду для вашего приложения и его среды исполнения с точки зрения предсказуемости и стабильности: неограниченное создание потоков в конечном итоге исчерпает ресурсы времени выполнения, и ваше приложение может испытать как следствие, серьезные проблемы с производительностью, которые могут привести даже к нестабильности приложения.

Это решение только одной части проблемы: вы ограничиваете количество выполняемых задач, но не ограничиваете количество заданий, которые могут быть отправлены и выставлены в очередь для последующего выполнения. В результате приложение будет испытывать нехватку ресурсов позже, но в конечном итоге это испытает его, если скорость подачи последовательно перерастает скорость выполнения.

Решение этой проблемы: Предоставление блокирующей очереди исполнителю для выполнения ожидающих задач. В случае заполнения очереди задание будет "отклонено". RejectedExecutionHandler вызывается, когда отсылка задания отклоняется, и поэтому отклоненный глагол был указан в предыдущем элементе. Вы можете реализовать свою собственную политику отклонения или использовать одну из встроенных политик, предоставляемых инфраструктурой.

Политики отклонения по умолчанию имеют исполнитель throw RejectedExecutionException. Однако другие встроенные политики позволяют:

  • Отменить задание молча.
  • Отмените старое задание и попробуйте повторно отправить последний.
  • Выполнение отклоненной задачи в потоке вызывающего.

Ответ 5

Вы можете найти определение терминов corepoolsize и maxpoolsize в javadoc. http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html

В приведенной выше ссылке есть ответ на ваш вопрос. Тем не менее, просто чтобы было ясно. Приложение будет продолжать создавать потоки, пока не достигнет corePoolSize. Я думаю, что идея состоит в том, что эти много потоков должны быть достаточными для обработки притока задач. Если новая задача возникает после создания потоков corePoolSize, задачи будут поставлены в очередь. Как только очередь будет заполнена, исполнитель начнет создавать новые потоки. Это своего рода балансировка. По сути, это означает, что приток задач больше, чем пропускная способность. Таким образом, Executor начнет создавать новые потоки снова, пока не достигнет максимального количества потоков. Опять же, новые потоки будут созданы тогда и только тогда, когда очередь будет заполнена.

Ответ 6

Хорошее объяснение в этом блоге:

Иллюстрация

public class ThreadPoolExecutorExample {

    public static void main (String[] args) {
        createAndRunPoolForQueue(new ArrayBlockingQueue<Runnable>(3), "Bounded");
        createAndRunPoolForQueue(new LinkedBlockingDeque<>(), "Unbounded");
        createAndRunPoolForQueue(new SynchronousQueue<Runnable>(), "Direct hand-off");
    }

    private static void createAndRunPoolForQueue (BlockingQueue<Runnable> queue,
                                                                      String msg) {
        System.out.println("---- " + msg + " queue instance = " +
                                                  queue.getClass()+ " -------------");

        ThreadPoolExecutor e = new ThreadPoolExecutor(2, 5, Long.MAX_VALUE,
                                 TimeUnit.NANOSECONDS, queue);

        for (int i = 0; i < 10; i++) {
            try {
                e.execute(new Task());
            } catch (RejectedExecutionException ex) {
                System.out.println("Task rejected = " + (i + 1));
            }
            printStatus(i + 1, e);
        }

        e.shutdownNow();

        System.out.println("--------------------\n");
    }

    private static void printStatus (int taskSubmitted, ThreadPoolExecutor e) {
        StringBuilder s = new StringBuilder();
        s.append("poolSize = ")
         .append(e.getPoolSize())
         .append(", corePoolSize = ")
         .append(e.getCorePoolSize())
         .append(", queueSize = ")
         .append(e.getQueue()
                  .size())
         .append(", queueRemainingCapacity = ")
         .append(e.getQueue()
                  .remainingCapacity())
         .append(", maximumPoolSize = ")
         .append(e.getMaximumPoolSize())
         .append(", totalTasksSubmitted = ")
         .append(taskSubmitted);

        System.out.println(s.toString());
    }

    private static class Task implements Runnable {

        @Override
        public void run () {
            while (true) {
                try {
                    Thread.sleep(1000000);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }
}

Вывод:

---- Bounded queue instance = class java.util.concurrent.ArrayBlockingQueue -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 2, corePoolSize = 2, queueSize = 1, queueRemainingCapacity = 2, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 2, corePoolSize = 2, queueSize = 2, queueCapacity = 1, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 5
poolSize = 3, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 6
poolSize = 4, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 7
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 8
Task rejected = 9
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9
Task rejected = 10
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------

---- Unbounded queue instance = class java.util.concurrent.LinkedBlockingDeque -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 2, corePoolSize = 2, queueSize = 1, queueRemainingCapacity = 2147483646, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 2, corePoolSize = 2, queueSize = 2, queueRemainingCapacity = 2147483645, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 2147483644, maximumPoolSize = 5, totalTasksSubmitted = 5
poolSize = 2, corePoolSize = 2, queueSize = 4, queueRemainingCapacity = 2147483643, maximumPoolSize = 5, totalTasksSubmitted = 6
poolSize = 2, corePoolSize = 2, queueSize = 5, queueRemainingCapacity = 2147483642, maximumPoolSize = 5, totalTasksSubmitted = 7
poolSize = 2, corePoolSize = 2, queueSize = 6, queueRemainingCapacity = 2147483641, maximumPoolSize = 5, totalTasksSubmitted = 8
poolSize = 2, corePoolSize = 2, queueSize = 7, queueRemainingCapacity = 2147483640, maximumPoolSize = 5, totalTasksSubmitted = 9
poolSize = 2, corePoolSize = 2, queueSize = 8, queueRemainingCapacity = 2147483639, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------

---- Direct hand-off queue instance = class java.util.concurrent.SynchronousQueue -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 3, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 4, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 5
Task rejected = 6
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 6
Task rejected = 7
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 7
Task rejected = 8
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 8
Task rejected = 9
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9
Task rejected = 10
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------


Process finished with exit code 0

Ответ 7

java.util.concurrent.ThreadPoolExecutor

  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

Ответ 8

Из книги Основы Java-конкуренции:

CorePoolSize: ThreadPoolExecutor имеет атрибут corePoolSize, который определяет, сколько потоков он запустит. пока новые потоки не начнутся, только когда очередь заполнена

MaximumPoolSize: этот атрибут определяет, сколько потоков запущено максимально. Вы можете установить это в Integer. MAX_VALUE, чтобы не было верхней границы