Подтвердить что ты не робот

Удаление всех задач в очереди ThreadPoolExecutor

У меня есть довольно простой вопрос о ThreadPoolExecutor. У меня есть следующая ситуация: я должен уничтожать объекты из очереди, создавать для них подходящие рабочие задачи и отправлять их в ThreadPoolExecutor. Это довольно просто. Но в сценарии завершения работы многие рабочие могут быть поставлены в очередь на выполнение. Поскольку одна из этих задач может выполняться в течение часа, и я хочу, чтобы релятививно быстрая изящная остановка приложения, я хочу отбросить все задачи в очереди из ThreadPoolExecutor, в то время как уже обрабатываемые задачи должны быть выполнены нормально.

Документация ThreadPoolExecutor имеет метод remove(), но только позволяет удалить определенные задачи. purge() работает только для отмененных будущих задач. Моя идея состояла в том, чтобы очистить очередь, содержащую все задачи в очереди. ThreadPoolExecutor предоставляет доступ к этой внутренней очереди, но в документации указано:

Метод getQueue() позволяет получить доступ к рабочая очередь для целей мониторинга и отладки. Использование этого метода для любой другой цели не рекомендуется.

Таким образом, захват этой очереди и очистка это не вариант. Кроме того, в этом фрагменте документации говорится:

Два поставляемых метода, удалить (java.lang.Runnable) и очистить() доступны для помощи в хранении рекультивации, когда большое количество задачи в очереди будут отменены.

Как? Конечно, я могу сохранить список всех задач, которые я представил исполнителю, и в случае завершения работы итерации по всем записям и удалить их из ThreadPoolExecutor методом remove()... но... давайте, это потери памяти и хлопоты для поддержания этого списка. (Например, удаление уже выполненных задач)

Я ценю любые подсказки или решения!

4b9b3361

Ответ 1

Рассматривали ли вы упаковку ExecutorService? Создайте

CleanShutdownExecutorService implements Executor 

который делегирует все вызовы другому Исполнителю, но сохраняет фьючерсы в своем собственном списке. Затем CleanShutdownExecutorService может иметь метод cancelRemainingTasks(), который вызывает shutdown(), а затем вызывает отмену (false) для всех фьючерсов в своем списке.

Ответ 2

Я работал над приложением с длинными потоками. Мы делаем это при выключении,

BlockingQueue<Runnable> queue = threadPool.getQueue();
List<Runnable> list = new ArrayList<Runnable>();
int tasks = queue.drainTo(list);

Список сохраняется в файле. При запуске список добавляется обратно в пул, поэтому мы не теряем никаких заданий.

Ответ 3

Как ExecutorService.shutdown() недостаточно, и ExecutorService.shutdownNow() делает слишком много, я думаю, вам нужно что-то написать в середине: запомните все ваши отправленные задачи и удалите их вручную после (или до) вызова shutdown().

Ответ 4

Это старый вопрос, но в случае, если это помогает кому-то еще: вы можете установить volatile boolean, когда вы вызываете shutdown(), и чтобы каждая отправленная задача завершалась, если это логическое значение задано перед запуском. Это позволит выполнять задачи, которые действительно начали завершаться, но предотвратит запуск заданий в очереди с их фактической деятельностью.

Ответ 5

Ответ Bombe - именно то, что вы хотите. shutdownNow() останавливает все, используя nuke и прокладывая подход. Это лучшее, что вы можете сделать, за исключением того, что вы используете подкласс ThreadPoolExecutor, который вы используете.

Ответ 6

Вы можете попробовать allowCoreThreadTimeOut(true);

Ответ 7

Сумасшедшее и нечистое решение, которое могло бы работать (не продумано или проверено), было бы перезаписать interrupt() ваших WorkerTasks, которые только в случае, если какое-либо глобальное значение установлено, отказываются от выключения, когда на них вызывается interrupt() by shutdownNow().

Это должно позволить вам использовать shutdownNow() no?

Ответ 8

Сообщите пул потоков к завершению, getQueue, для каждого результата в отдельные Runnables, удалите каждый Runnable с помощью метода remove. В зависимости от типа очереди вы можете приостановить удаление раньше, основываясь на возвращаемых значениях.

В принципе, это захват очереди и ее очистка, только очистка с помощью методов, которые работают. Вместо того, чтобы вручную запоминать все материалы, вы используете тот факт, что пул потоков уже должен помнить все материалы. Однако вам, вероятно, понадобится сделать защитную копию очереди, так как я думаю, что это живое представление, и поэтому удаление, вероятно, вызовет исключение параллельной модификации, если вы выполняете итерацию/для каждого в реальном времени.

Ответ 9

Не работает ли awaitTermination(long timeout, TimeUnit unit) после выключения?

executor.shutdown(); executor.awaitTermination(60, TimeUnit.SECONDS)