Подтвердить что ты не робот

Как подождать завершения ThreadPoolExecutor

Мой вопрос: Как выполнить кучу поточных объектов на ThreadPoolExecutor и дождаться их завершения до перехода

Я новичок в ThreadPoolExecutor. Таким образом, этот код является испытанием, чтобы узнать, как он работает. Прямо сейчас я даже не заполняю BlockingQueue объектами, потому что я не понимаю, как запустить очередь, не вызывая execute() с другим RunnableObject. Во всяком случае, прямо сейчас я просто звоню awaitTermination(), но я думаю, что я все еще что-то пропустил. Любые советы были бы замечательными! Спасибо.

public void testThreadPoolExecutor() throws InterruptedException {
  int limit = 20;
  BlockingQueue q = new ArrayBlockingQueue(limit);
  ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
  for (int i = 0; i < limit; i++) {
    ex.execute(new RunnableObject(i + 1));
  }
  ex.awaitTermination(2, TimeUnit.SECONDS);
  System.out.println("finished");
}

Класс RunnableObject:

package playground;

public class RunnableObject implements Runnable {

  private final int id;

  public RunnableObject(int id) {
    this.id = id;
  }

  @Override
  public void run() {
    System.out.println("ID: " + id + " started");
    try {
      Thread.sleep(2354);
    } catch (InterruptedException ignore) {
    }
    System.out.println("ID: " + id + " ended");
  }
}
4b9b3361

Ответ 1

Вы должны зациклиться на awaitTermination

ExecutorService threads;
// ...
// Tell threads to finish off.
threads.shutdown();
// Wait for everything to finish.
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) {
  log.info("Awaiting completion of threads.");
}

Ответ 2

Кажется, что проблема заключается в том, что вы не вызываете shutdown после того, как вы отправили все задания в свой пул. Без shutdown() ваш awaitTermination всегда будет возвращать false.

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}
// you are missing this line!!
ex.shutdown();
ex.awaitTermination(2, TimeUnit.SECONDS);

Вы также можете сделать что-то вроде следующего, чтобы дождаться завершения всех ваших заданий:

List<Future<Object>> futures = new ArrayList<Future<Object>>();
for (int i = 0; i < limit; i++) {
  futures.add(ex.submit(new RunnableObject(i + 1), (Object)null));
}
for (Future<Object> future : futures) {
   // this joins with the submitted job
   future.get();
}
...
// still need to shutdown at the end
ex.shutdown();

Кроме того, поскольку вы спали за 2354 миллисекунды, но только для ожидания завершения всех заданий для 2 SECONDS, awaitTermination всегда будет возвращать false.

Наконец, похоже, что вы беспокоитесь о создании нового ThreadPoolExecutor, и вместо этого вы хотите повторно использовать первый. Не будь. Накладные расходы GC будут крайне минимальными по сравнению с любым кодом, который вы пишете, чтобы определить, завершены ли задания.


Для цитирования из javadocs, ThreadPoolExecutor.shutdown():

Инициирует упорядоченное завершение работы, в котором выполняются ранее поставленные задачи, но новые задачи не будут приняты. Вызов не имеет дополнительного эффекта, если он уже выключен.

В методе ThreadPoolExecutor.awaitTermination(...) он ожидает, пока состояние исполнителя перейдет к TERMINATED. Но сначала состояние должно перейти в shutdown, если вызывается shutdown() или STOP, если вызывается shutdownNow().

Ответ 3

Это не имеет никакого отношения к самому исполнителю. Просто используйте интерфейс java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>). Он будет блокироваться до тех пор, пока все Callable не будут завершены.

Исполнители должны быть долговечными; за пределами жизни группы задач. shutdown предназначен для завершения и очистки приложения.

Ответ 4

Другой подход - использовать CompletionService, очень полезно, если вам нужно попытаться выполнить любой результат задачи:

//run 3 task at time
final int numParallelThreads = 3;

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);

int numTaskToStart = 15;

for(int i=0; i<numTaskToStart ; i++){
    //task class that implements Callable<String> (or something you need)
    MyTask mt = new MyTask();

    completionService.submit(mt);
}

executor.shutdown(); //it cannot be queued more task

try {
    for (int t = 0; t < numTaskToStart ; t++) {
        Future<String> f = completionService.take();
        String result = f.get();
        // ... something to do ...
    }
} catch (InterruptedException e) {
    //termination of all started tasks (it returns all not started tasks in queue)
    executor.shutdownNow();
} catch (ExecutionException e) {
    // ... something to catch ...
}

Ответ 5

Здесь вариант с принятым ответом, который обрабатывает повторы, если/при вызове InterruptedException:

executor.shutdown();

boolean isWait = true;

while (isWait)
{
    try
    {             
        isWait = !executor.awaitTermination(10, TimeUnit.SECONDS);
        if (isWait)
        {
            log.info("Awaiting completion of bulk callback threads.");
        }
    } catch (InterruptedException e) {
        log.debug("Interruped while awaiting completion of callback threads - trying again...");
    }
}

Ответ 6

Попробуйте это,

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}

Линии, которые нужно добавить

ex.shutdown();
ex.awaitTermination(timeout, unit)