Подтвердить что ты не робот

BlockingQueue - заблокированные методы дренажа()

В BlockingQueue есть метод, называемый drainTo(), но он не заблокирован. Мне нужна очередь, которую я хочу заблокировать, но также способную извлекать объекты в очереди одним способом.

Object first = blockingQueue.take();

if ( blockingQueue.size() > 0 )
    blockingQueue.drainTo( list );

Я думаю, что вышеприведенный код будет работать, но я ищу элегантное решение.

4b9b3361

Ответ 1

Вы ссылаетесь на комментарий в JavaDoc:

Далее, поведение этой операции undefined, если указанная коллекция изменяется во время выполнения операции.

Я считаю, что это относится к коллекции list в вашем примере:

blockingQueue.drainTo(list);

означает, что вы не можете изменить list в то же время, когда вы сбрасываете с blockingQueue в list. Тем не менее, блокирующая очередь внутренне синхронизируется так, что, когда вызывается drainTo, блокировка puts и (см. Примечание ниже). Если бы это не было сделано, тогда это было бы не по-настоящему безопасным для Thread. Вы можете посмотреть исходный код и убедиться, что drainTo является потокобезопасным относительно самой блокирующей очереди.

В качестве альтернативы, вы имеете в виду, когда вы вызываете drainTo, который хотите заблокировать, пока в очередь не будет добавлен хотя бы один объект? В этом случае у вас есть другой выбор, кроме:

list.add(blockingQueue.take());
blockingQueue.drainTo(list);

чтобы заблокировать до тех пор, пока не будет добавлен один или несколько элементов, а затем слейте всю очередь в коллекцию list.

Примечание. Что касается Java 7, для get и puts используется отдельная блокировка. Операции ввода теперь разрешены во время дренажа (и нескольких других операций).

Ответ 2

Если вы используете Google Guava, есть отличный Queues.drain() метод.

Сбрасывает очередь как BlockingQueue.drainTo(Collection, int), но если запрошенные элементы numElements недоступны, он будет ждать до указанного таймаута.

Ответ 3

Я нашел этот шаблон полезным.

List<byte[]> blobs = new ArrayList<byte[]>();
if (queue.drainTo(blobs, batch) == 0) {
   blobs.add(queue.take());
}

Ответ 4

Имея доступный API, я не думаю, что вы собираетесь стать более элегантным. Кроме того, вы можете удалить тест размера.

Если вы хотите атомарно извлекать непрерывную последовательность элементов, даже если другая операция удаления совпадает, я не верю, что даже drainTo гарантирует это.

Ответ 5

Исходный код:

 596:     public int drainTo(Collection<? super E> c) {
              //arg. check
 603:         lock.lock();
 604:         try {
 608:             for (n = 0 ; n != count ; n++) {
 609:                 c.add(items[n]);
 613:             }
 614:             if (n > 0) {
 618:                 notFull.signalAll();
 619:             }
 620:             return n;
 621:         } finally {
 622:             lock.unlock();
 623:         }
 624:     }

ArrayBlockingQueue хочет вернуть 0. Кстати, он мог сделать это, прежде чем принимать блокировку.