Подтвердить что ты не робот

Найти размер потока перед выполнением других операций

В моей программе я повторно 1collect Java 8 потоки, чтобы уменьшить совокупность объектов до одного. Размер этой коллекции может сильно различаться на протяжении всего исполнения: от 3 объектов до сотен.

public void findInterestingFoo(Stream<Foo> foos) {
    internalState.update(foos.collect(customCollector()));
}

В процессе оптимизации моего кода и поиска узких мест я сделал поток parallel в какой-то момент. Это сработало в тот момент, поскольку коллекции были довольно большими. Позже, изменив другие части и параметры программы, коллекции стали меньше. Я понял, что не сделать параллельный поток более эффективным. Это имеет смысл: накладные расходы по распределению работы над несколькими потоками для 4 объектов просто не стоят. Однако это стоит для сотен объектов.

Было бы очень удобно, если бы я мог сделать только большие потоки параллельными:

public void findInterestingFoo(Stream<Foo> foos) {
    if (isSmall(foos)) {
        internalState.update(foos.collect(customCollector()));
    } else {
        internalState.update(foos.parallel().collect(customCollector()));
    }
}

Конечно, это можно сделать вручную, когда поток создается из массива, коллекция, или вручную. То есть мы знаем, какие элементы входят в поток, поэтому это можно отслеживать. Тем не менее, я заинтересован в решении этого в общем виде, так что независимо от того, какой поток передается на findInterestingFoo, он обрабатывается надлежащим образом и максимально эффективно.

Что-то вроде count() может помочь, за исключением того, что он завершает поток, прежде чем я смогу collect it.

Мне хорошо известно, что потоки предназначены для не заданного размера, в частности:

  • Возможно неограниченное. Хотя коллекции имеют конечный размер, потоки не нужны. Операции короткого замыкания, такие как limit(n) или findFirst(), позволяют завершить вычисления в бесконечных потоках за конечное время. — java.util.stream описание пакета

Тем не менее, мне интересно, есть ли способ определить, сколько элементов находится в потоке, прежде чем выполнять какие-либо операции над ним. Неужели поток действительно не знает, что он создан из конечной коллекции?

__________
1 Тысячи раз. Оптимизация этого привела к ускорению от общего времени работы от 1,5 до 0,5 секунд в моем случае.

4b9b3361

Ответ 1

В теории вы можете сделать что-то вроде этого:

public void findInterestingFoo(Stream<Foo> foos) {
    Spliterator<Foo> sp = foos.spliterator();
    long size = sp.getExactSizeIfKnown();// returns -1 if not known
          // or sp.estimateSize(); // Long.MAX_VALUE means "unknown"
    internalState.update(
        StreamSupport.stream(sp, size > PARALLEL_THRESHOLD)
                     .collect(customCollector()));
}

spliterator() - это операция терминала, которая потребляет входной поток, но вы можете передать Spliterator до StreamSupport.stream для создания потока с точно такими же свойствами. Второй параметр уже указывает, должен ли поток быть параллельным.

В теории.

На практике реализация текущего потока возвращает разные реализации Spliterator в зависимости от того, параллелен ли поток или нет. Это означает, что воссоздание потока в виде параллельного потока может закончиться потоком, который не способен выполнять параллельную обработку, когда исходный поток уже не был параллелен перед вызовом spliterator().

Хорошо работает, если нет промежуточных операций, например. когда вы прямо передаете Stream, созданный из коллекции или массива.

Вызов parallel() до spliterator(), чтобы получить поток, поддерживающий параллельную передачу, который может продолжаться последовательно, если вы решите это сделать, работает в во многих случаях. Однако, если во входном потоке есть промежуточные операции с выражением состояния, такие как sorted(), они могут быть исправлены для параллельной работы, даже если вы последовательно collect (или наоборот).


Другая проблема имеет фундаментальный характер. Количество элементов на самом деле не говорит, будет ли преимущество в параллельной обработке или нет. Это зависит от рабочей нагрузки на каждом элементе, что зависит не только от операции с вашим терминалом collect, но и от операций, уже привязанных к потоку, перед входом в ваш метод. Даже если вы сделаете вывод, что рабочая нагрузка ваших коллекторов уже достаточно высока, чтобы заслуживать параллельной обработки, может быть, что входящий поток имеет операции типа skip, limit или distinct (по упорядоченному потоку), которые часто ухудшаются параллельно и требуют совершенно другого порога.

Более простое решение состоит в том, чтобы позволить вызывающему абоненту решить, поскольку вызывающий абонент знает что-то о размере и характере потока. Вам даже не нужно добавлять параметр к вашей сигнатуре методов, поскольку вызывающий может уже принять решение, вызвав parallel() или sequential() в потоке, прежде чем передавать его вашему методу, и вы можете уважать это, просто не меняя режим.