Подтвердить что ты не робот

Java 8 Streams: подсчет всех элементов, которые входят в операцию терминала

Интересно, есть ли более хороший (или просто другой) подход, чтобы получить количество всех элементов, которые вводят операцию терминала в поток, а не следующее:

Stream<T> stream = ... // given as parameter
AtomicLong count = new AtomicLong();
stream.filter(...).map(...)
      .peek(t -> count.incrementAndGet())

где count.get() дает мне фактическое количество обработанных элементов на этом этапе.

Я намеренно пропустил операцию терминала, поскольку это может измениться между .forEach, .reduce или .collect. Я уже знаю .count, но, похоже, он работает хорошо, только если я обмениваю .forEach на .map и использую .count как терминальная операция. Но мне кажется, что .map используется неправильно.

Что мне не очень нравится в вышеупомянутом решении: если после него добавляется фильтр, он просто подсчитывает элементы на этом конкретном этапе, но не те, которые входят в операцию терминала.

Другой подход, который приходит мне на ум, - это collect отфильтрованные и отображенные значения в список, и работать с ним и просто вызвать list.size(), чтобы получить счет. Однако это не сработает, если сбор потока приведет к ошибке, тогда как с вышеупомянутым решением я мог бы иметь счет для всех обработанных элементов до сих пор, если соответствующий try/catch находится на месте. Это, однако, не является жестким требованием.

4b9b3361

Ответ 1

Кажется, у вас уже есть самое чистое решение через peek перед операцией терминала IMO. Единственная причина, по которой я могу думать, что это необходимо, - это отладка целей, и если это так, то для этого был разработан peek. Обтекание Stream для этого и предоставление отдельных реализаций слишком много - помимо огромного количества времени и более поздней поддержки для всего, что добавляется в Streams.

Для части того, что, если добавлен еще один фильтр? Ну, дайте комментарий к коду (многие из нас это делают) и несколько тестовых примеров, которые в противном случае могли бы потерпеть неудачу, например.


Только мои 0,02 $

Ответ 2

Лучшая идея, которая возможна, - это использовать отображение на себе и при этом подсчитывать вызов программы отображения.

steam.map(object -> {counter.incrementAndGet(); return object;});

Так как эта лямбда может быть использована повторно, и вы можете заменить любую лямбду на объект, вы можете создать объект-счетчик следующим образом:

class StreamCounter<T> implements Function<? super T,? extends T> {
  int counter = 0;
  public T apply(T object) { counter++; return object;}
  public int get() { return counter;}
}

Итак, используя:

StreamCounter<String> myCounter = new ...;
stream.map(myCounter)...
int count = myCounter.get();

Так как снова вызов карты - это еще одна точка повторного использования, метод карты может быть обеспечен путем расширения потока и переноса обычного потока.

Таким образом вы можете создать что-то вроде:

AtomicLong myValue = new AtomicLong();
...
convert(stream).measure(myValue).map(...).measure(mySecondValue).filter(...).measure(myThirdValue).toList(...);

Таким образом, вы можете просто иметь свою собственную Stream-обертку, которая прозрачно обертывает каждый поток в своей собственной версии (которая не является служебной или служебной информацией) и измеряет мощность любой такой точки измерения.

Это часто делается при анализе сложности алгоритмов при создании решений map/reduce. Расширяя реализацию потока, не беря атомный длинный экземпляр для подсчета, а только имя меры, ваша реализация потока может содержать неограниченное количество точек измерения, обеспечивая гибкий способ печати отчета.

Такая реализация может запомнить конкретную последовательность методов потока вместе с положением каждой точки измерения и выводит такие результаты, как:

list ->  (32k)map -> (32k)filter -> (5k)map -> avg(). 

Такая реализация потока записывается один раз, может использоваться для тестирования, а также для отчетности.

Встраивание в повседневную реализацию дает возможность собирать статистику для определенной обработки и допускать динамическую оптимизацию с использованием другой перестановки операций. Это будет, например, оптимизатор запросов.

Итак, в вашем случае лучше всего использовать сначала StreamCounter и в зависимости от частоты использования, количество счетчиков и близость к DRY-принципу в конечном итоге реализуют более сложное решение позже.

PS: StreamCounter использует значение int и не является потокобезопасным, поэтому в настройке параллельного потока вместо int следует заменить экземпляр AtomicInteger.