Подтвердить что ты не робот

Можно ли дублировать поток в Java 8?

Иногда я хочу выполнить набор операций над потоком, а затем обработать результирующий поток двумя разными способами с помощью других операций.

Могу ли я сделать это, не указывая общие начальные операции дважды?

Например, я надеюсь, что существует метод dup(), такой как следующее:

Stream [] desired_streams = IntStream.range(1, 100).filter(n -> n % 2 == 0).dup();
Stream stream14 = desired_streams[0].filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_streams[1].filter(n -> n % 5 == 0); // multiples of 10
4b9b3361

Ответ 1

В общем случае это невозможно.

Если вы хотите дублировать входной поток или ввести итератор, у вас есть два варианта:

A. Храните все в коллекции, скажем, List<>

Предположим, вы дублируете поток в два потока s1 и s2. Если у вас есть расширенные элементы n1 в s1 и n2 элементах с s2, вы должны сохранить |n2 - n1| элементы в памяти, просто чтобы идти в ногу. Если ваш поток бесконечен, для хранения не может быть верхней границы.

Взгляните на Python tee(), чтобы узнать, что нужно:

Этот itertool может потребовать значительного вспомогательного хранилища (в зависимости от того, сколько временных данных необходимо сохранить). В общем случае, если один итератор использует большинство или все данные перед запуском другого итератора, быстрее использовать list() вместо tee().

В. Когда это возможно: скопируйте состояние генератора, который создает элементы

Чтобы эта опция работала, вам, вероятно, понадобится доступ к внутренней работе потока. Другими словами, генератор - часть, которая создает элементы - должна поддерживать копирование в первую очередь. [OP: см. отличный ответ, как пример того, как это можно сделать для примера в вопросе]

Он не будет работать на входе пользователя, так как вам придется копировать состояние всего "внешнего мира". Java Stream не поддерживает копирование, поскольку он разработан как можно более общий, в частности, для работы с файлами, сетью, клавиатурой, датчиками, случайностью и т.д. [OP: Еще один пример - поток, который считывает температурный датчик по требованию, Он не может быть дублирован без сохранения копии показаний]

Это не только случай Java; это общее правило. Вы можете видеть, что std::istream в С++ поддерживает только семантику перемещения, а не семантику копирования ( "copy constructor (deleted)" ), по этой причине (и другие).

Ответ 2

Невозможно дублировать поток таким образом. Однако вы можете избежать дублирования кода, перемещая общую часть в метод или выражение лямбда.

Supplier<IntStream> supplier = () ->
    IntStream.range(1, 100).filter(n -> n % 2 == 0);
supplier.get().filter(...);
supplier.get().filter(...);

Ответ 3

Возможно, если вы буферизируете элементы, которые вы использовали в одном дубликате, но не в другом.

Мы реализовали метод duplicate() для потоков в jOOλ, библиотеке с открытым исходным кодом, которую мы создали для улучшения тестирования интеграции jOOQ. По существу, вы можете просто написать:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate();

(обратите внимание: в настоящее время нам нужно вставить поток, поскольку мы еще не реализовали IntSeq)

Внутри есть буфер LinkedList, сохраняющий все значения, которые были израсходованы из одного потока, но не из другого. Это, вероятно, так же эффективно, как и в случае, если ваши два потока потребляются с одинаковой скоростью.

Вот как работает алгоритм:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final LinkedList<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

Дополнительный исходный код здесь

Фактически, используя jOOλ, вы сможете написать полный однострочный слой следующим образом:

Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq(
    IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed()
).duplicate()
 .map1(s -> s.filter(n -> n % 7 == 0))
 .map2(s -> s.filter(n -> n % 5 == 0));

// This will yield 14, 28, 42, 56...
desired_streams.v1.forEach(System.out::println)

// This will yield 10, 20, 30, 40...
desired_streams.v2.forEach(System.out::println);

Ответ 4

Обновление: Этот не работает. См. Объяснение ниже, после текста исходного ответа.

Как глупо со мной. Все, что мне нужно сделать, это:

Stream desired_stream = IntStream.range(1, 100).filter(n -> n % 2 == 0);
Stream stream14 = desired_stream.filter(n -> n % 7 == 0); // multiples of 14
Stream stream10 = desired_stream.filter(n -> n % 5 == 0); // multiples of 10

Объяснение, почему это не работает:

Если вы закодируете его и попытаетесь собрать оба потока, первый будет собирать штраф, но при попытке потока второй будет генерироваться исключение: java.lang.IllegalStateException: stream has already been operated upon or closed.

Чтобы разработать, потоки представляют собой объекты с сохранением состояния (которые, кстати, не могут быть reset или перематывать). Вы можете считать их итераторами, которые, в свою очередь, похожи на указатели. Поэтому stream14 и stream10 можно рассматривать как ссылки на один и тот же указатель. Потребление первого потока полностью приведет к тому, что указатель будет идти "мимо конца". Попытка использовать второй поток - это попытка получить доступ к указателю, который уже "прошел за конец", что, естественно, является незаконной операцией.

Как показывает принятый ответ, код для создания потока должен выполняться дважды, но его можно разделить на lambda Supplier или аналогичную конструкцию.

Полный тестовый код: сохранить в Foo.java, затем javac Foo.java, затем java Foo

import java.util.stream.IntStream;

public class Foo {
  public static void main (String [] args) {
    IntStream s = IntStream.range(0, 100).filter(n -> n % 2 == 0);
    IntStream s1 = s.filter(n -> n % 5 == 0);
    s1.forEach(n -> System.out.println(n));
    IntStream s2 = s.filter(n -> n % 7 == 0);
    s2.forEach(n -> System.out.println(n));
  }
}

Вывод:

$ javac Foo.java
$ java Foo
0
10
20
30
40
50
60
70
80
90
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.<init>(AbstractPipeline.java:203)
    at java.util.stream.IntPipeline.<init>(IntPipeline.java:91)
    at java.util.stream.IntPipeline$StatelessOp.<init>(IntPipeline.java:592)
    at java.util.stream.IntPipeline$9.<init>(IntPipeline.java:332)
    at java.util.stream.IntPipeline.filter(IntPipeline.java:331)
    at Foo.main(Foo.java:8)

Ответ 5

Либо,

  • Переместите инициализацию в метод и просто вызовите метод снова

Это имеет то преимущество, что явное описание того, что вы делаете, а также работает для бесконечных потоков.

  • Соберите поток, а затем повторно введите его

В вашем примере:

final int[] arr = IntStream.range(1, 100).filter(n -> n % 2 == 0).toArray();

Тогда

final IntStream s = IntStream.of(arr);

Ответ 6

Вы также можете переместить генерацию потока в отдельный метод/функцию, которая возвращает этот поток и вызывает его дважды.