Подтвердить что ты не робот

Является ли добавление задач к BlockingQueue ThreadPoolExecutor целесообразным?

JavaDoc для ThreadPoolExecutor неясно, можно ли добавлять задачи непосредственно в BlockingQueue, поддерживая исполнителя. Документы говорят, что

Я создаю ThreadPoolExecutor со своим BlockingQueue. Я сохраняю ссылку на очередь, поэтому я могу добавлять задачи к ней напрямую. Та же очередь возвращается getQueue(), поэтому я предполагаю, что предупреждение в getQueue() применяется к ссылке на резервную очередь, полученную с помощью моих средств.

Пример

Общий шаблон кода:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer() vs executor.execute()

Как я понимаю, типичное использование заключается в добавлении задач через executor.execute(). Подход в моем примере выше имеет преимущество блокировки в очереди, тогда как execute() не работает немедленно, если очередь заполнена и отклоняет мою задачу. Мне также нравится, что отправка заданий взаимодействует с блокирующей очередью; это кажется мне более "чистым" производителем-потребителем.

Непосредственное включение задач в очередь: я должен вызвать prestartAllCoreThreads(), иначе рабочие потоки не будут запущены. Предполагая, что никаких других взаимодействий с исполнителем нет, ничего не будет отслеживать очередь (обследование источника ThreadPoolExecutor подтверждает это). Это также подразумевает прямой привязку к тому, что ThreadPoolExecutor должен быть дополнительно настроен для > 0 основных потоков и не должен быть сконфигурирован так, чтобы позволить потокам ядра к тайм-ауту.

TL;DR

Учитывая ThreadPoolExecutor, настроенный следующим образом:

основные потоки > 0 Основные потоки не допускаются к тайм-ауту основные потоки предварительно настроены держите ссылку на BlockingQueue, поддерживая исполнителя

Допустимо ли добавлять задачи непосредственно в очередь вместо вызова executor.execute()?

Похожие

Этот вопрос () аналогичен, но конкретно не предусматривает непосредственного добавления в очередь.

4b9b3361

Ответ 1

Если бы это был я, я предпочел бы использовать Executor#execute() над Queue#offer(), просто потому, что уже использовал все остальное из java.util.concurrent.

Ваш вопрос хороший, и он вызвал мой интерес, поэтому я взглянул на источник для ThreadPoolExecutor#execute():

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

Мы можем видеть, что сам выполнение вызывает offer() в рабочей очереди, но не до того, как при необходимости принести приятные, вкусные пустые манипуляции. По этой причине я думаю, что было бы целесообразно использовать execute(); не используя его (хотя я не знаю наверняка) заставляют пул работать неоптимально. Тем не менее, я не думаю, что использование offer() приведет к поломке исполнителя - похоже, что задачи выведены из очереди с использованием следующего (также из ThreadPoolExecutor):

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

Этот метод getTask() вызывается только из цикла, поэтому, если исполнитель не выключается, он блокируется до тех пор, пока в очередь не будет задана новая задача (независимо от того, откуда она была).

Примечание. Несмотря на то, что я опубликовал фрагменты кода из исходного кода, мы не можем полагаться на них для окончательного ответа - мы должны только кодировать API. Мы не знаем, как реализация execute() со временем изменится.

Ответ 2

Один трюк заключается в реализации пользовательского подкласса ArrayBlockingQueue и переопределения метода offer() для вызова вашей блокирующей версии, тогда вы все равно можете использовать обычный путь кода.

queue = new ArrayBlockingQueue<Runnable>(queueSize) {
  @Override public boolean offer(Runnable runnable) {
    try {
      return offer(runnable, 1, TimeUnit.HOURS);
    } catch(InterruptedException e) {
      // return interrupt status to caller
      Thread.currentThread().interrupt();
    }
    return false;
  }
};

(как вы, наверное, можете догадаться, я думаю, что вызов предлагается прямо в очереди, поскольку ваш обычный путь к коду, вероятно, является плохой идеей).

Ответ 3

Можно реально настроить поведение пула, когда очередь заполнена, указав RejectedExecutionHandler при создании экземпляра. ThreadPoolExecutor определяет четыре политики как внутренние классы, включая AbortPolicy, DiscardOldestPolicy, DiscardPolicy, а также мой личный фаворит CallerRunsPolicy, который запускает новое задание в управляющем потоке.

Например:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        nproc, // core size
        nproc, // max size
        60, // idle timeout
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
        new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.

Поведение, заданное в вопросе, может быть получено с помощью следующего:

public class BlockingPolicy implements RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        executor.getQueue.put(r); // Self contained, no queue reference needed.
    }

В какой-то момент очередь должна быть доступна. Лучшее место для этого - это автономный RejectedExecutionHandler, который сохраняет дублирование кода или потенциальные ошибки, возникающие в результате непосредственного манипулирования очередью в области объекта пула. Обратите внимание, что обработчики, включенные в ThreadPoolExecutor, сами используют getQueue().

Ответ 4

Это очень важный вопрос, если используемая вами очередь представляет собой совершенно другую реализацию из стандартной встроенной памяти LinkedBlockingQueue или ArrayBlockingQueue.

Например, если вы реализуете шаблон производителя-потребителя с использованием нескольких производителей на разных машинах и используете механизм очередей, основанный на отдельной подсистеме сохранения (например, Redis), тогда вопрос становится актуальным сам по себе, даже если вы не хотите блокировать offer() как OP.

Итак, данный ответ, что prestartAllCoreThreads() должен быть вызван (или достаточно раз prestartCoreThread()), чтобы рабочие потоки были доступны и запущены, достаточно важно подчеркнуть.

Ответ 5

При необходимости мы также можем использовать паркинг, который отделяет основную обработку от отклоненных задач -

    final CountDownLatch taskCounter = new CountDownLatch(TASKCOUNT);
    final List<Runnable> taskParking = new LinkedList<Runnable>();
    BlockingQueue<Runnable> taskPool = new ArrayBlockingQueue<Runnable>(1);
    RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.err.println(Thread.currentThread().getName() + " -->rejection reported - adding to parking lot " + r);
            taskCounter.countDown();
            taskParking.add(r);
        }
    };
    ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(5, 10, 1000, TimeUnit.SECONDS, taskPool, rejectionHandler);
    for(int i=0 ; i<TASKCOUNT; i++){
        //main 
        threadPoolExecutor.submit(getRandomTask());
    }
    taskCounter.await(TASKCOUNT * 5 , TimeUnit.SECONDS);
    System.out.println("Checking the parking lot..." + taskParking);
    while(taskParking.size() > 0){
        Runnable r = taskParking.remove(0);
        System.out.println("Running from parking lot..." + r);
        if(taskParking.size() > LIMIT){
          waitForSometime(...);
        }
        threadPoolExecutor.submit(r);
    }
    threadPoolExecutor.shutdown();