Подтвердить что ты не робот

Гибкий CountDownLatch?

Я столкнулся с проблемой дважды, когда поток производителя создает N рабочих элементов, передает их в ExecutorService, а затем должен ждать, пока не обработаются все N элементов.

Предостережение

  • N заранее не известно. Если бы это было, я просто создал бы CountDownLatch, а затем создавал поток await(), пока не завершится вся работа.
  • Использование CompletionService неуместно, потому что хотя мой поток производителя должен блокировать (т.е. вызывая take()), не указывать, что все работы завершены, чтобы поток производителя остановка ожидания.

Мое последнее предпочтительное решение - использовать целочисленный счетчик и приращение, когда каждый элемент работы отправляется, а декремент - при обработке рабочего элемента. После подбора всех задач N мой поток производителя должен будет ждать блокировки, проверяя, будет ли counter == 0 всякий раз, когда он уведомляется. Потребительский поток должен будет уведомить производителя, если он уменьшил счетчик, а новое значение равно 0.

Есть ли лучший подход к этой проблеме или есть подходящая конструкция в java.util.concurrent, которую я должен использовать, а не "сворачивать свои собственные"?

Спасибо заранее.

4b9b3361

Ответ 1

java.util.concurrent.Phaser похоже, что это сработает хорошо для вас. Планируется выпуск в Java 7, но наиболее стабильную версию можно найти на веб-сайте jsr166.

Фазер - это прославленный Циклический Барьер. Вы можете зарегистрировать N количество сторон и когда вы готовы дождаться своего продвижения на определенной фазе.

Быстрый пример того, как он будет работать:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}

Ответ 2

Конечно, вы можете использовать CountDownLatch, защищенный AtomicReference, чтобы ваши задачи были обернуты таким образом:

public class MyTask extends Runnable {
    private final Runnable r;
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }

    public void run() {
        r.run();
        while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
        l.get().countDown();
    }
}

Обратите внимание на, что задачи запускают свою работу, а затем вращаются до тех пор, пока не будет установлен обратный отсчет (т.е. общее количество заданий известно). Как только обратный отсчет установлен, они подсчитывают его и выходят. Они передаются следующим образом:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));

После точки создания/представления вашей работы, когда вы знаете, сколько задач вы создали:

latch.set(new CountDownLatch(nTasks));
latch.get().await();

Ответ 3

Я использовал ExecutorCompletionService для чего-то вроде этого:

ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
    executor.submit(new Processor());
    count++;
}

//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
    executor.take().get();
}

Это включает в себя сохранение очереди заданий, которые были отправлены, поэтому, если ваш список произвольно длинный, ваш метод может быть лучше.

Но не забудьте использовать AtomicInteger для согласования, чтобы вы могли увеличивать его в одном потоке и уменьшать его в рабочих потоках.

Ответ 4

Я предполагаю, что вашему продюсеру не нужно знать, когда очередь пуста, но должна знать, когда последняя задача была завершена.

Я бы добавил к waitforWorkDone(producer) метод waitforWorkDone(producer). Производитель может добавить свои N задачи и вызвать метод wait. Метод wait блокирует входящий поток, если рабочая очередь не пуста и в настоящий момент не выполняются задачи.

Потребительские потоки notifyAll() в блокировке waitfor, если его задача завершена, очередь пуста и никакая другая задача не выполняется.

Ответ 6

То, что вы описали, очень похоже на использование стандартного семафора, но используется "назад".

  • Семафор начинается с 0 разрешений
  • Каждый рабочий модуль освобождает одно разрешение, когда оно завершается
  • Вы блокируете, ожидая получения разрешений N.

Кроме того, у вас есть возможность только приобрести M < N, что полезно, если вы хотите проверить промежуточное состояние. Например, я тестирую асинхронную ограниченную очередь сообщений, поэтому я ожидаю, что очередь будет заполнена для некоторого M < N, поэтому я могу получить M и проверить, действительно ли очередь заполнена, а затем получить оставшиеся разрешения N-M после того, как вы отправили сообщения из очереди.