Подтвердить что ты не робот

Вычисление с учетом состояния потока: совокупные суммы

Предполагая, что у меня есть Java IntStream, можно ли преобразовать его в IntStream с суммарными суммами? Например, поток, начинающийся с [4, 2, 6,...], должен быть преобразован в [4, 6, 12,...].

В более общем плане, как следует идти об осуществлении операций с потоком состояний? Кажется, это должно быть возможно:

myIntStream.map(new Function<Integer, Integer> {
    int sum = 0; 
    Integer apply(Integer value){ 
        return sum += value; 
    }
);

С очевидным ограничением, что это работает только на последовательные потоки. Однако Stream.map явно требует функцию отображения без сохранения. Правильно ли я теряю поток Stream.statefulMap или Stream.cumulative или не хватает точки потоков Java?

Сравните, например, с Haskell, где функция scanl1 решает именно этот пример:

scanl1 (+) [1 2 3 4] = [1 3 6 10]
4b9b3361

Ответ 1

Вы можете сделать это с помощью атомного номера. Например:

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

public class Accumulator {
    public static LongStream toCumulativeSumStream(IntStream ints){
        AtomicLong sum = new AtomicLong(0);
        return ints.sequential().mapToLong(sum::addAndGet);
    }

    public static void main(String[] args){
        LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5));
        sums.forEachOrdered(System.out::println);
    }
}

Выводится:

1
3
6
10

Я использовал Long для хранения сумм, потому что вполне возможно, что два ints добавляются до уровня Integer.MAX_VALUE, а у long меньше вероятность переполнения.

Ответ 2

Это возможно сделать с коллектором, который затем создает новый поток:

class Accumulator {
    public static void accept(List<Integer> list, Integer value) {
        list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1)));
    }

    public static List<Integer> combine(List<Integer> list1, List<Integer> list2) {
        int total = list1.get(list1.size() - 1);
        list2.stream().map(n -> n + total).forEach(list1::add);
        return list1;
    }
}

Это используется как:

myIntStream.parallel()
    .collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine)
    .stream();

Надеюсь, вы увидите, что важным атрибутом этого коллектора является то, что даже если поток параллелен при объединении экземпляров Accumulator, он корректирует итоговые значения.

Это, очевидно, не так эффективно, как операция с картой, потому что он собирает весь поток и затем создает новый поток. Но это не просто деталь реализации: это необходимая функция того, что потоки предназначены для потенциальной параллельной обработки.

Я тестировал его с помощью IntStream.range(0, 10000).parallel(), и он работает правильно.

Ответ 3

Следующий код поможет вам сделать это.

public static void main(String[] args) {
  List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);

  Integer[] previous = {0};
  List<Integer> data = integers.stream().map(no -> no + previous[0]).map(no -> previous[0] = no)
      .collect(Collectors.toList());

  System.out.println(data);
}