Подтвердить что ты не робот

Объект синхронизации для обеспечения выполнения всех задач

Какой объект синхронизации Java я должен использовать для обеспечения выполнения сколь угодно большого числа задач? Ограничения заключаются в следующем:

  • Каждая задача занимает нетривиальное количество времени для завершения, и выполнение задач параллельно.
  • Слишком много задач для размещения в памяти (т.е. я не могу поместить Future для каждой задачи в Collection, а затем вызвать get на всех фьючерсах).
  • Я не знаю, сколько задач будет (т.е. я не могу использовать CountDownLatch).
  • ExecutorService может быть общим, поэтому я не могу использовать awaitTermination( long, TimeUnit )

Например, с помощью Grand Central Dispatch я могу сделать что-то вроде этого:

let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
{
    dispatch_group_enter( latch )
    dispatch_async( workQueue )
    {
        self.processItem( item ) // method takes a non-trivial amount of time to run
        dispatch_async( countUpdateQueue )
        {
            itemsProcessed++
        }
        dispatch_group_leave( latch )
    }
}
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )

Он производит вывод, который выглядит следующим образом (для 128 элементов): Processed 128 items in 1.846794962883 seconds.

Я попробовал нечто подобное с Phaser:

final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
    latch.register();
    final Runnable task = new Runnable() {
        public void run() {
            processItem( item ); // method takes a non-trivial amount of time to run
            itemsProcessed.incrementAndGet();
            latch.arrive();
        }
    };
    executor.execute( task );
}
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );

Задачи не всегда заполняются перед последним оператором печати, и я могу получить вывод, который выглядит следующим образом (для 128 элементов): Processed 121 items in 5.296 seconds. Является ли Phaser даже правильным объектом? Документация указывает, что она поддерживает только 65535 сторон, поэтому мне нужно будет либо пакетные элементы для обработки, либо ввести несколько типов Phaser.

4b9b3361

Ответ 1

", чтобы обеспечить выполнение сколь угодно большого числа задач" - самый простой способ - поддерживать счетчик завершенных задач, с блокировкой операции, чтобы дождаться достижения заданного количества задач. Нет такого готового класса, но его легко сделать:

class EventCounter {
   long counter=0;

   synchronized void up () {
     counter++;
     notifyAll();
   }
   synchronized void ensure (long count) {
     while (counter<count) wait();
   }
 }

"В память слишком много задач" - поэтому процесс отправки новых задач должен быть приостановлен, когда количество запущенных задач слишком велико. Самый простой способ - рассмотреть количество запущенных задач в качестве ресурса и подсчитать его с помощью семафора:

Semaphore runningTasksSema=new Semaphore(maxNumberOfRunningTasks);
EventCounter  eventCounter =new EventCounter ();

for( final String item : fetchItems() ) {
    final Runnable task = new Runnable() {
       public void run() {
            processItem( item ); 
            runningTasksSema.release();
            eventCounter.up();
       }
    };
   runningTasksSema.aquire();
   executor.execute(task);
}

Когда поток хочет обеспечить выполнение определенного количества заданий, он вызывает:

eventCounter.ensure(givenNumberOfFinishedTasks);

Асинхронные (неблокирующие) версии операций runningTasksSema.aquire() и eventCounter.ensure() могут быть сконструированы, но они будут более сложными.

Ответ 2

Проблема с использованием Phaser в этом примере заключается в том, что CallerRunsPolicy позволяет выполнять задачу в инициирующем потоке. Таким образом, пока цикл все еще продолжается, количество прибывших сторон может равняться количеству зарегистрированных сторон, что приводит к увеличению фазы. Решение состоит в том, чтобы инициализировать Phaser с помощью 1 стороны, затем, когда цикл закончен, придите и дождитесь прибытия других сторон. Это гарантирует, что фаза не увеличивается до 1, пока все задачи не будут завершены.

final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 1 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
    latch.register();
    final Runnable task = new Runnable() {
        public void run() {
            processItem( item ); // method takes a non-trivial amount of time to run
            itemsProcessed.incrementAndGet();
            final int arrivalPhase = latch.arrive();
        }
    };
    executor.execute( task );
}
latch.arriveAndAwaitAdvance();
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );

Ответ 3

Если вы используете java8, вы можете использовать CompletableFuture

java.util.concurrent.CompletableFuture.allOf(CompletableFuture<?>... cfs)

который будет ожидать результатов всех фьючерсов в переданном массиве.