Какой объект синхронизации Java я должен использовать для обеспечения выполнения сколь угодно большого числа задач? Ограничения заключаются в следующем:
- Каждая задача занимает нетривиальное количество времени для завершения, и выполнение задач параллельно.
- Слишком много задач для размещения в памяти (т.е. я не могу поместить
Future
для каждой задачи вCollection
, а затем вызватьget
на всех фьючерсах). - Я не знаю, сколько задач будет (т.е. я не могу использовать
CountDownLatch
). -
ExecutorService
может быть общим, поэтому я не могу использоватьawaitTermination( long, TimeUnit )
Например, с помощью Grand Central Dispatch я могу сделать что-то вроде этого:
let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
{
dispatch_group_enter( latch )
dispatch_async( workQueue )
{
self.processItem( item ) // method takes a non-trivial amount of time to run
dispatch_async( countUpdateQueue )
{
itemsProcessed++
}
dispatch_group_leave( latch )
}
}
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )
Он производит вывод, который выглядит следующим образом (для 128 элементов): Processed 128 items in 1.846794962883 seconds.
Я попробовал нечто подобное с Phaser
:
final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
latch.register();
final Runnable task = new Runnable() {
public void run() {
processItem( item ); // method takes a non-trivial amount of time to run
itemsProcessed.incrementAndGet();
latch.arrive();
}
};
executor.execute( task );
}
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );
Задачи не всегда заполняются перед последним оператором печати, и я могу получить вывод, который выглядит следующим образом (для 128 элементов): Processed 121 items in 5.296 seconds.
Является ли Phaser
даже правильным объектом? Документация указывает, что она поддерживает только 65535 сторон, поэтому мне нужно будет либо пакетные элементы для обработки, либо ввести несколько типов Phaser
.