Подтвердить что ты не робот

Почему ThreadPoolExecutor имеет BlockingQueue в качестве аргумента?

Я попытался создать и выполнить ThreadPoolExecutor с помощью

int poolSize = 2;
int maxPoolSize = 3;
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

Если я пытаюсь выполнить 7-ю, 8-ю... задачу непрерывно

  threadPool.execute(task);  

после того, как очередь достигла максимального размера
, он начинает метать "RejectedExecutionException". Значит, я потерял добавление этих задач.

В этом случае, какова роль BlockingQueue, если у нее отсутствуют задачи? Значит, почему он не ждет?

Из определения BlockingQueue

Очередь, которая дополнительно поддерживает операции, ожидающие очереди стать непустым при извлечении элемента и дождаться места становятся доступными в очереди при хранении элемента.


Почему мы не можем найти связанный список (обычная реализация очереди вместо блокировки очереди)?

4b9b3361

Ответ 1

Проблема возникает из-за слишком маленькой очереди задач, о чем свидетельствует документация метода execute:

Выполняет задание в будущем. Задача может выполняться в новом потоке или в существующем объединенном потоке. Если задача не может быть отправлена ​​на исполнение, либо из-за того, что этот исполнитель был остановлен, либо из-за того, что его емкость была достигнута, задача обрабатывается текущим RejectedExecutionHandler.

Итак, первая проблема заключается в том, что вы устанавливаете размер очереди на очень небольшое число:

int poolSize = 2;
int maxPoolSize = 3;
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

И затем вы указываете: "Если [I] попытается выполнить 7-ю, 8-ю... задачу", вы получите RejectedExecutionException, потому что вы пропустили пропускную способность очереди. Есть два способа решить вашу проблему (я бы рекомендовал сделать то и другое):

  • Увеличьте размер очереди.
  • Вызовите исключение и повторите попытку добавления задачи.

У вас должно быть что-то в этом роде:

public void ExecuteTask(MyRunnableTask task) {
    bool taskAdded = false;
    while(!taskAdded) {
        try {
            executor.execute(task);
            taskAdded = true;
        } catch (RejectedExecutionException ex) {
            taskAdded = false;
        }
    }   
}

Теперь, чтобы ответить на ваши другие вопросы...

В этом случае, какова роль BlockingQueue, если у нее отсутствуют задачи?

Роль BlockingQueue заключается в заполнении шаблона Продюсер/Потребитель, и если он достаточно велик, то вы не должны видеть проблемы, с которыми вы сталкиваетесь. Как я уже упоминал выше, вам нужно увеличить размер очереди и поймать исключение, а затем повторить выполнение задачи.

Почему мы не можем найти связанный список?

Связанный список не является ни потокобезопасным, ни блокирующим. Образец производителя/потребителя имеет тенденцию работать лучше всего с блокирующей очередью.

Update

Пожалуйста, не обижайтесь на следующие утверждения, я намеренно использую более строгий язык, чтобы подчеркнуть тот факт, что ваше первое предположение никогда не должно состоять в том, что что-то не так с библиотекой, которую вы используете ( если вы сами не написали библиотеку, и вы знаете, что в ней есть определенная проблема)!

Итак, позвольте поставить это беспокойство прямо сейчас: здесь не проблема ThreadPoolExecutor и библиотека Java. Это полностью ваше (неправильное) использование библиотеки, которая вызывает проблему. Javmex имеет отличный учебник, объясняющий точную ситуацию, которую вы видите.

Может быть несколько причин, по которым вы заполняете очередь быстрее, чем вы ее опорожняете:

  • Нить, добавляющая задачи для выполнения, добавляет их слишком быстро.
  • Задачи выполняются слишком долго, чтобы выполнить.
  • Ваша очередь слишком мала.
  • Любая комбинация из вышеуказанных 3.

Есть и ряд других причин, но я думаю, что это было бы самым распространенным.

Я бы предложил вам простое решение с неограниченной очередью, но это НЕ разрешило бы ваше (неправильное) использование библиотеки. Итак, прежде чем мы будем обвинять библиотеку Java, давайте рассмотрим краткий пример, демонстрирующий точную проблему, с которой вы сталкиваетесь.

Обновление 2.0

Вот несколько других вопросов, касающихся конкретной проблемы:

Ответ 2

Блокирующая очередь предназначена главным образом для потребителей (потоки в пуле). Потоки могут ждать появления новых задач в очереди, они будут автоматически разбужены. Простой связанный список не будет служить этой цели.

На стороне производителя поведение по умолчанию - это выброс исключения, если очередь заполнена. Это можно легко настроить, реализовав собственный RejectedExceptionHandler. В вашем обработчике вы можете получить доступ к очереди и вызвать метод put, который будет блокироваться до тех пор, пока не станет доступно больше места.

Но это нехорошо делать - причина в том, что если в этом исполнителе есть проблема (тупик, медленная обработка), это вызовет эффект пульсации для остальной части системы. Например, если вы вызываете метод execute из сервлета - если метод выполнения блокирует, то все потоки контейнеров будут задержаны, и ваше приложение остановится. Вероятно, это причина, по которой поведение по умолчанию заключается в том, чтобы выбросить исключение и не ждать. Также не существует реализации RejectedExceptionHandler, который делает это - чтобы препятствовать людям использовать его.

Существует опция (CallersRunPolicy) для выполнения в вызывающем потоке, который может быть другим вариантом, если вы хотите, чтобы обработка выполнялась.

Общее правило: лучше обработать один запрос с ошибкой, а не свести всю систему вниз. Возможно, вы захотите прочитать об circuit-breaker.

Ответ 3

Вы не используете BlockingQueue в том виде, как он предназначен для использования.

A BlockingQueue используется для реализации шаблона производителя. Потоки продюсеров помещают элементы в очередь через блокирующий метод put(), тогда как потребительский поток принимает элементы из очереди через блокировка take().

put() blocks - это означает, что если очередь заполнена, она ждет, пока потребитель не отправит элемент из очереди, прежде чем добавлять его, а затем вернется.

take() block - это означает, что если очередь пуста, она ждет, пока производитель поставит элемент в очередь, прежде чем принимать его и вернуть.

Этот шаблон полностью отключает производителей от потребителей, за исключением того, что они разделяют очередь.

Попробуйте использовать такую ​​очередь: попросите исполнителя выполнить некоторые потоки, которые действуют как производители, а некоторые - как потребители.

Ответ 4

Вы можете найти цель BlockingQueue здесь

/**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
    private final BlockingQueue<Runnable> workQueue;

BlockingQueue полезен для обеспечения хорошего решения проблемы производителя. Вы можете найти его в приведенном ниже примере:

Проигрыватель/Потребительские потоки с использованием очереди

Что касается RejectedExecutionException, вы не установили правильный размер для BlockingQueue.

Ответ 5

Из вашей настройки запрос может быть отклонен, если одновременно больше 5. Поскольку порядок последовательности будет иметь размер пула → queue → max pool size.

  • Создаются два итеративных потока
  • После этого, если запрос будет превышать 2, запрос подпоследовательности будет помещен в очередь.
  • Если очередь заполнена (для которой вы настроены 2), будет создан новый поток, но не будет превышать максимальный размер пула (который вы настроили 3).
  • Если поступит больше запросов, и весь поток/рабочий занят, а очередь заполнена, запрос будет отклонен (с учетом конфигурации rejectPolicy).

Подробнее можно прочитать здесь: https://dzone.com/articles/scalable-java-thread-pool-executor