Подтвердить что ты не робот

Java ExecutorService: waitTermination всех рекурсивно созданных задач

Я использую ExecutorService для выполнения задачи. Эта задача может рекурсивно создавать другие задачи, которые передаются в один и тот же ExecutorService, и эти дочерние задачи тоже могут это сделать.

У меня теперь есть проблема, что я хочу подождать, пока все задачи не будут выполнены (то есть все задачи закончены, и они не отправят новые), прежде чем продолжить.

Я не могу вызвать ExecutorService.shutdown() в основном потоке, потому что это предотвращает принятие новых задач с помощью ExecutorService.

И Вызов ExecutorService.awaitTermination() кажется, ничего не делает, если shutdown не был вызван.

Таким образом, я немного застрял здесь. Это не может быть так трудно для ExecutorService видеть, что все рабочие бездействуют, не так ли? Единственное неэффективное решение, которое я мог бы придумать, - это прямое использование ThreadPoolExecutor и запрос его getPoolSize() каждый раз в то время. Неужели нет лучшего способа сделать это?

4b9b3361

Ответ 1

Если количество задач в дереве рекурсивных задач изначально неизвестно, возможно, самым простым способом было бы реализовать свой собственный примитив синхронизации, какой-то "обратный семафор" и поделиться им между вашими задачами. Перед отправкой каждой задачи вы увеличиваете значение, когда задача завершается, она уменьшает это значение, и вы ждете, пока значение не будет 0.

Реализация его как отдельного примитива, явно вызванного из задач, отделяет эту логику от реализации пула потоков и позволяет вам представить несколько независимых деревьев рекурсивных задач в один и тот же пул.

Что-то вроде этого:

public class InverseSemaphore {
    private int value = 0;
    private Object lock = new Object();

    public void beforeSubmit() {
        synchronized(lock) {
            value++;
        }
    }

    public void taskCompleted() {
        synchronized(lock) {
            value--;
            if (value == 0) lock.notifyAll();
        }
    }

    public void awaitCompletion() throws InterruptedException {
        synchronized(lock) {
            while (value > 0) lock.wait();
        }
    }
}

Обратите внимание, что taskCompleted() следует вызывать внутри блока finally, чтобы он не зависел от возможных исключений.

Также обратите внимание, что beforeSubmit() должен вызываться подающим потоком перед тем, как задача будет отправлена, а не самой задачей, чтобы избежать возможного "ложного завершения", когда старые задачи завершены, а новые еще не запущены.

EDIT: Важная проблема с фиксированным шаблоном использования.

Ответ 2

Это действительно идеальный кандидат на Phaser. Java 7 выходит с этим новым классом. Его гибкий CountdonwLatch/CyclicBarrier. Вы можете получить стабильную версию на JSR 166 Interest Site.

Способ, которым он является более гибким CountdownLatch/CyclicBarrier, заключается в том, что он способен не только поддерживать неизвестное количество сторон (потоков), но и его повторно использовать (то есть, где входит фазовая часть)

Для каждой заданной вами задачи вы должны зарегистрироваться, когда эта задача будет завершена, вы приедете. Это можно сделать рекурсивно.

Phaser phaser = new Phaser();
ExecutorService e = //

Runnable recursiveRunnable = new Runnable(){
   public void run(){
      //do work recursively if you have to

      if(shouldBeRecursive){
           phaser.register();
           e.submit(recursiveRunnable);
      }

      phaser.arrive();
   }
}

public void doWork(){
   int phase = phaser.getPhase();

   phaser.register();
   e.submit(recursiveRunnable);

   phaser.awaitAdvance(phase);
}

Изменить: Спасибо @depthofreality за указание условия гонки в моем предыдущем примере. Я обновляю его так, чтобы исполняемый поток только ожидал перехода к текущей фазе, когда он блокирует выполнение рекурсивной функции.

Номер фазы не будет отключен до номера arrive == register s. Поскольку перед каждым рекурсивным вызовом вызывается register, инкремент фазы будет выполняться, когда все вызовы завершены.

Ответ 3

Ничего себе, вы, ребята, быстро:)

Спасибо за все предложения. Фьючерсы нелегко интегрируются с моей моделью, потому что я не знаю, сколько runnables запланировано заранее. Поэтому, если я оставлю родительскую задачу в ожидании завершения рекурсивных дочерних задач, у меня много мусора.

Я решил свою проблему, используя предложение AtomicInteger. По сути, я подклассифицировал ThreadPoolExecutor и увеличил счетчик на вызовы execute() и уменьшил на вызовы afterExecute(). Когда счетчик получает 0, я вызываю shutdown(). Это, похоже, работает на мои проблемы, не уверен, что это вообще хороший способ сделать это. В частности, я предполагаю, что вы используете только execute() для добавления Runnables.

В качестве стороны node: я сначала попытался проверить afterExecute() количество Runnables в очереди и число рабочих, которые активны и завершены, когда они равны 0; но это не сработало, потому что не все Runnables появились в очереди, и getActiveCount() не делал того, что ожидал.

Во всяком случае, здесь мое решение: (если кто-то найдет серьезные проблемы с этим, сообщите мне:)

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger executing = new AtomicInteger(0);

    public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
        TimeUnit seconds, BlockingQueue<Runnable> queue) {
        super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
    }


    @Override
    public void execute(Runnable command) {
        //intercepting beforeExecute is too late!
        //execute() is called in the parent thread before it terminates
        executing.incrementAndGet();
        super.execute(command);
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        int count = executing.decrementAndGet();
        if(count == 0) {
            this.shutdown();
        }
    }

}

Ответ 4

Вы можете создать свой собственный пул потоков, который расширяет ThreadPoolExecutor. Вы хотите знать, когда была отправлена ​​задача и когда она завершается.

public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private int counter = 0;

    public MyThreadPoolExecutor() {
        super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    public synchronized void execute(Runnable command) {
        counter++;
        super.execute(command);
    }

    @Override
    protected synchronized void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        counter--;
        notifyAll();
    }

    public synchronized void waitForExecuted() throws InterruptedException {
        while (counter == 0)
            wait();
    }
}

Ответ 5

Используйте Future для своих задач (вместо отправки Runnable), обратный вызов обновляет его, когда он будет завершен, поэтому вы можете использовать Future.isDone, чтобы отслеживать состояние всех ваших задач.

Ответ 6

Используйте CountDownLatch. Передайте объект CountDownLatch для каждой из ваших задач и запрограммируйте свои задачи, как показано ниже.

public void doTask() {
    // do your task
    latch.countDown(); 
}

В то время как поток, который должен ждать, должен выполнить следующий код:

public void doWait() {
    latch.await();
}

Но, конечно, это предполагает, что вы уже знаете количество дочерних задач, чтобы вы могли инициализировать счет защелки.

Ответ 7

(mea culpa: его "бит" мимо моего сна;), но здесь первая попытка динамической защелки):

package oss.alphazero.sto4958330;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class DynamicCountDownLatch {
    @SuppressWarnings("serial")
    private static final class Sync extends AbstractQueuedSynchronizer {
        private final CountDownLatch toplatch;
        public Sync() {
            setState(0);
            this.toplatch = new CountDownLatch(1);
        }

        @Override
        protected int tryAcquireShared(int acquires){
            try {
                toplatch.await();
            } 
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted", e);
            }
            return getState() == 0 ? 1 : -1;
        }
        public boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) 
                    return nextc == 0;
            }
        }
        public boolean tryExtendState(int acquires) {
            for (;;) {
                int s = getState();
                int exts = s+1;
                if (compareAndSetState(s, exts)) {
                    toplatch.countDown();
                    return exts > 0;
                }
            }
        }
    }
    private final Sync sync;
    public DynamicCountDownLatch(){
        this.sync = new Sync();
    }
    public void await() 
        throws InterruptedException   
    {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit   unit) 
        throws InterruptedException   
    {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void countDown() {
        sync.releaseShared(1);
    }
    public void join() {
        sync.tryExtendState(1);
    }
}

Эта защелка вводит новый метод join() в существующий (клонированный) API CountDownLatch, который используется задачами, чтобы сигнализировать о входе в большую группу задач.

Защелка проходит от родительской задачи к дочерней задаче. Каждая задача должна была бы по одному шаблону Suraj сначала "join()" защелку, выполнить свою задачу(), а затем countDown().

Чтобы устранить ситуации, когда основной поток запускает группу задач, а затем сразу ждет() - до того, как какой-либо из потоков задач имел возможность даже присоединиться() - используется topLatch int inner Sync класс. Это защелка, которая будет подсчитываться по каждому соединению(); Конечно, только первый обратный отсчет является значительным, так как все последующие - nops.

В начальной реализации выше вводится семантическая морщина, потому что tryAcquiredShared (int) не должен бросать InterruptedException, но тогда нам нужно иметь дело с прерыванием в ожидании на topLatch.

Это улучшение по сравнению с собственным решением OP с использованием счетчиков Atomic? Я бы сказал, что, вероятно, не IFF, он настаивает на использовании Executors, но я считаю, что такой же приемлемый альтернативный подход с использованием AQS в этом случае, а также применим к генерическим потокам.

Откажитесь от других хакеров.

Ответ 8

Если вы хотите использовать классы JSR166y - например, Phaser или Fork/Join - любой из них может работать для вас, вы всегда можете загрузить их backback из них: http://gee.cs.oswego.edu/dl/concurrency-interest/ и использовать что в качестве основы, а не для написания полностью доморощенного решения. Затем, когда выйдет 7, вы можете просто сбросить зависимость от backport и изменить несколько имен пакетов.

(Полное раскрытие: мы использовали LinkedTransferQueue в prod на некоторое время сейчас. Нет проблем)

Ответ 9

Я должен сказать, что решения, описанные выше проблемы с рекурсивной вызывающей задачей и ожидания задач конечного подотряда, не удовлетворяют меня. Есть мое решение, вдохновленное оригинальной документацией от Oracle: CountDownLatch и пример там: Человеческие ресурсы CountDownLatch.

Первый общий поток в процессе, например класс HRManagerCompact, имеет задержку ожидания для двух дочерних потоков, у которых есть ожидающие защелки для последующих двух дочерних потоков... и т.д.

Конечно, защелка может быть установлена ​​на другое значение, чем 2 (в конструкторе CountDownLatch), а также количество запущенных объектов может быть установлено в итерации, то есть в ArrayList, но оно должно соответствовать (количество обратных отсчетов должно быть равный параметру в конструкторе CountDownLatch).

Будьте осторожны, количество защелок увеличивается экспоненциально в соответствии с условием ограничения: 'level.get() < 2 ', а также количество объектов. 1, 2, 4, 8, 16... и защелки 0, 1, 2, 4... Как вы можете видеть, для четырех уровней (level.get() < 4) будет 15 ожидающих потоков и 7 защелки во времени, когда работают пиковые пики 16.

package processes.countdownlatch.hr;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/** Recursively latching running classes to wait for the peak threads
 *
 * @author hariprasad
 */
public class HRManagerCompact extends Thread {
  final int N = 2; // number of daughter tasks for latch
  CountDownLatch countDownLatch;
  CountDownLatch originCountDownLatch;
  AtomicInteger level = new AtomicInteger(0);
  AtomicLong order = new AtomicLong(0); // id latched thread waiting for

  HRManagerCompact techLead1 = null;
  HRManagerCompact techLead2 = null;
  HRManagerCompact techLead3 = null;

// constructor
public HRManagerCompact(CountDownLatch countDownLatch, String name,
    AtomicInteger level, AtomicLong order){
  super(name);
  this.originCountDownLatch=countDownLatch;
  this.level = level;
  this.order = order;
 }

 private void doIt() {
    countDownLatch = new CountDownLatch(N);
    AtomicInteger leveli = new AtomicInteger(level.get() + 1);
    AtomicLong orderi = new AtomicLong(Thread.currentThread().getId());
    techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi);
    techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi);
    //techLead3 = new HRManagerCompact(countDownLatch, "third", leveli);

    techLead1.start();
    techLead2.start();
    //techLead3.start();

    try {
     synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread
       System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi);
       countDownLatch.await(); // wait actual thread
     }
     System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
  }

 @Override
 public void run() {
  try {
   System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId());
   Thread.sleep(10*level.intValue());
   if (level.get() < 2) doIt();
   Thread.yield();
  }
  catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  /*catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }*/
  // TODO Auto-generated method stub
  System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId());
  originCountDownLatch.countDown(); // count down
 }

 public static void main(String args[]){
  AtomicInteger levelzero = new AtomicInteger(0);
  HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue()));
  hr.doIt();
 }
}

Возможный комментарий (с некоторой вероятностью):

first: working... 1, 1, 10 // thread 1, first daughter task (10)
second: working... 1, 1, 11 // thread 1, second daughter task (11)
first: working... 2, 10, 12 // thread 10, first daughter task (12)
first: working... 2, 11, 14 // thread 11, first daughter task (14)
second: working... 2, 11, 15 // thread 11, second daughter task (15)
second: working... 2, 10, 13 // thread 10, second daughter task (13)
--- first: recruted 2, 10, 12 // finished 12
--- first: recruted 2, 11, 14 // finished 14
--- second: recruted 2, 10, 13  // finished 13 (now can be opened latch 10)
--- second: recruted 2, 11, 15  // finished 15 (now can be opened latch 11)
*** HR Manager waiting for recruitment to complete... 0, 0, 1
*** HR Manager waiting for recruitment to complete... 1, 1, 10
*** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened
--- first: recruted 1, 1, 10 // finished 10
*** HR Manager waiting for recruitment to complete... 1, 1, 11
*** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened
--- second: recruted 1, 1, 11  // finished 11 (now can be opened latch 1)
*** Distribute Offer Letter, it means finished. 0, 0, 1  // latch on 1 opened

Ответ 10

Единственное нечеткое решение, которое я мог бы придумать, - это напрямую использовать ThreadPoolExecutor и запрашивать его getPoolSize() каждый раз. Неужели нет лучшего способа сделать это?

Вы должны использовать методы shutdown() , awaitTermination() and shutdownNow() в правильной последовательности.

shutdown(): инициирует упорядоченное завершение работы, в котором выполняются ранее поставленные задачи, но новые задачи не будут приняты.

awaitTermination(): Блокирует, пока все задачи не завершили выполнение после запроса на завершение работы или не произойдет тайм-аут, или текущий поток не будет прерван, в зависимости от того, что произойдет раньше.

shutdownNow(): пытается остановить все активное выполнение задач, останавливает обработку ожидающих задач и возвращает список задач, ожидающих выполнения.

Рекомендуемый способ из страницы документации оракула ExecutorService:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }

Вы можете заменить условие с условием while в случае продолжительности выполнения задач, как показано ниже:

Измените

if (!pool.awaitTermination(60, TimeUnit.SECONDS))

Для

 while(!pool.awaitTermination(60, TimeUnit.SECONDS)) {
     Thread.sleep(60000);
 }  

Вы можете ссылаться на другие альтернативы (кроме join(), которые могут использоваться с автономным потоком):

дождитесь, пока все потоки закончат работу в java

Ответ 11

Вы можете использовать бегун, который отслеживает запущенные потоки:

Runner runner = Runner.runner(numberOfThreads);

runner.runIn(2, SECONDS, callable);
runner.run(callable);


// blocks until all tasks are finished (or failed)
runner.waitTillDone();


// and reuse it
runner.runRunnableIn(500, MILLISECONDS, runnable);


runner.waitTillDone();


// and then just kill it
runner.shutdownAndAwaitTermination();

чтобы использовать его, просто добавьте зависимость:

скомпилируйте 'com.github.matejtymes: javafixes: 1.3.0'