Подтвердить что ты не робот

Элегантное внедрение индикаторов длины очереди в ExecutorServices

Почему, а почему java.util.concurrent не предоставляет индикаторы длины очереди для своих ExecutorService s? Недавно я обнаружил, что делаю что-то вроде этого:

ExecutorService queue = Executors.newSingleThreadExecutor();
AtomicInteger queueLength = new AtomicInteger();
...

public void addTaskToQueue(Runnable runnable) {
    if (queueLength.get() < MAX_QUEUE_LENGTH) {
        queueLength.incrementAndGet(); // Increment queue when submitting task.
        queue.submit(new Runnable() {
            public void run() {
                runnable.run();
                queueLength.decrementAndGet(); // Decrement queue when task done.
            }
        });
    } else {
        // Trigger error: too long queue
    }
}

Что работает нормально, но... Я думаю, что это действительно должно быть реализовано как часть ExecutorService. Он немой и ошибка, склонная переносить счетчик, отделенный от фактической очереди, длина которой указывает счетчик (напоминает мне массивы C). Но ExecutorService получаются с помощью статических методов factory, поэтому нет возможности просто расширить отличный исполнитель одиночного потока и добавить счетчик очереди. Итак, что мне делать:

  • Reinvent, уже реализованный в JDK?
  • Другое умное решение?
4b9b3361

Ответ 1

Существует более прямой способ:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor();
// add jobs
// ...
int size = executor.getQueue().size();

Хотя вы можете не использовать удобные методы создания Executor, а скорее создать исполнителя непосредственно, чтобы избавиться от приведения, и, таким образом, убедитесь, что исполнитель всегда будет ThreadPoolExecutor, даже если реализация Executors.newSingleThreadExecutor изменится когда-нибудь.

ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );

Это прямое копирование из Executors.newSingleThreadExecutor в JDK 1.6. LinkedBlockingQueue, который передается конструктору, на самом деле является тем самым объектом, с которого вы вернетесь из getQueue.

Ответ 2

Пока вы можете напрямую проверить размер очереди. Другой способ справиться с очередью, которая слишком длинна, - сделать внутреннюю очередь ограниченной.

public static
ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                              5000L, TimeUnit.MILLISECONDS,
                              new ArrayBlockingQueue<Runnable>(queueSize, true));
}

Это приведет к RejectedExecutionExceptions (см. this), когда вы превысите предел.

Если вы хотите избежать исключения, вызывающий поток может быть захвачен для выполнения функции. См. этот вопрос SO для решения.

Ссылки