Подтвердить что ты не робот

Java 8 Stream: разница между limit() и skip()

Говоря о Stream s, когда я выполняю этот фрагмент кода

public class Main {
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

Я получаю этот вывод

A1B1C1
A2B2C2
A3B3C3

потому что ограничение моего потока на первые три компонента заставляет действия A, B и C выполняться только три раза.

Попытка выполнить аналогичное вычисление по последним трем элементам с помощью метода skip() показывает другое поведение: this

public class Main {
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .skip(6)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

выводит этот

A1
A2
A3
A4
A5
A6
A7B7C7
A8B8C8
A9B9C9

Почему в этом случае выполняются действия от A1 до A6? Это должно иметь какое-то отношение к тому, что предел является короткой замыкающей промежуточной операцией состояния, в то время как пропустить нет, но я не понимаю практические последствия этого свойства. Это просто, что "каждое действие перед пропуском выполняется, пока не все до предела"?

4b9b3361

Ответ 1

Здесь у вас есть два потоковых конвейера.

Эти потоковые конвейеры состоят из источника, нескольких промежуточных операций и операции с терминалом.

Но промежуточные операции ленивы. Это означает, что ничего не происходит, если для операции нисходящего потока не требуется элемент. Когда это происходит, промежуточная операция делает все необходимое для создания требуемого элемента, а затем снова ждет, пока не будет запрошен другой элемент, и т.д.

Операции с терминалами обычно "нетерпеливы". То есть, они запрашивают все элементы в потоке, которые необходимы для их завершения.

Итак, вы должны действительно думать о конвейере как forEach, запрашивая поток за ним для следующего элемента, и этот поток запрашивает поток позади него и т.д. вплоть до источника.

С учетом этого, посмотрим, что у нас есть с вашим первым конвейером:

Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));

Итак, forEach запрашивает первый элемент. Это означает, что "B" peek нуждается в элементе и запрашивает для него выходной поток limit, что означает, что limit нужно будет запросить "A" peek, который идет к источнику. Элемент задан и доходит до forEach, и вы получите свою первую строку:

A1B1C1

forEach запрашивает другой элемент, затем другой. И каждый раз запрос распространяется вверх по потоку и выполняется. Но когда forEach запрашивает четвертый элемент, когда запрос попадает в limit, он знает, что он уже предоставил все элементы, которые ему разрешено дать.

Таким образом, он не запрашивает "A" для другого элемента. Он сразу указывает, что его элементы исчерпаны, и, таким образом, больше действий не выполняется и forEach завершается.

Что происходит во втором конвейере?

    Stream.of(1,2,3,4,5,6,7,8,9)
    .peek(x->System.out.print("\nA"+x))
    .skip(6)
    .peek(x->System.out.print("B"+x))
    .forEach(x->System.out.print("C"+x));

Опять же, forEach запрашивает первый элемент. Это распространяется обратно. Но когда он добирается до skip, он знает, что он должен попросить 6 предметов из своего восходящего потока, прежде чем он сможет передать один ниже по течению. Таким образом, он делает запрос вверх по течению от "A" peek, потребляет его, не передавая его вниз по течению, делает другой запрос и так далее. Таким образом, просмотр "A" получает 6 запросов к элементу и производит 6 отпечатков, но эти элементы не передаются.

A1
A2
A3
A4
A5
A6

В 7-м запросе, сделанном skip, элемент передается на "B" и от него до forEach, поэтому выполняется полная печать:

A7B7C7

Тогда это точно так же, как раньше. skip теперь, всякий раз, когда он получает запрос, запрашивает элемент вверх и передает его вниз по течению, так как он "знает", что уже выполнил свою работу по пропуску. Таким образом, остальные отпечатки проходят через всю трубу, пока источник не исчерпается.

Ответ 2

Свободная запись потокового трубопровода - вот что вызывает эту путаницу. Подумайте об этом так:

limit(3)

Все конвейерные операции оцениваются лениво, кроме forEach(), который является операцией терминала , которая запускает "выполнение конвейера".

Когда выполняется конвейер, определения промежуточного потока не будут делать никаких предположений о том, что происходит "до" или "после". Все, что они делают, это взять входной поток и преобразовать его в выходной поток:

Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9);
Stream<Integer> s2 = s1.peek(x->System.out.print("\nA"+x));
Stream<Integer> s3 = s2.limit(3);
Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x));

s4.forEach(x->System.out.print("C"+x));
  • s1 содержит 9 различных значений Integer.
  • s2 просматривает все значения, которые передают его и печатает.
  • s3 передает первые 3 значения в s4 и прерывает конвейер после третьего значения. Никакие дополнительные значения не вырабатываются s3. Это не означает, что больше нет значений в конвейере. s2 все равно будет выдавать (и печатать) больше значений, но никто не запрашивает эти значения и, следовательно, выполнение останавливается.
  • s4 снова заглядывает во все значения, которые передают его и печатает.
  • forEach потребляет и печатает все, что s4 переходит к нему.

Подумайте об этом так. Весь поток полностью ленив. Только операция терминала активно извлекает новые значения из конвейера. После того, как он вытащил 3 значения из s4 <- s3 <- s2 <- s1, s3 больше не будет производить новые значения и больше не будет вытаскивать значения из s2 <- s1. В то время как s1 -> s2 все еще сможет создавать 4-9, эти значения просто не извлекаются из конвейера и, следовательно, никогда не будут печататься с помощью s2.

skip(6)

С skip() происходит то же самое:

Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9);
Stream<Integer> s2 = s1.peek(x->System.out.print("\nA"+x));
Stream<Integer> s3 = s2.skip(6);
Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x));

s4.forEach(x->System.out.print("C"+x));
  • s1 содержит 9 различных значений Integer.
  • s2 просматривает все значения, которые передают его и печатает.
  • s3 потребляет первые 6 значений, "пропуская их", что означает, что первые 6 значений не передаются на s4, а только следующие значения.
  • s4 снова заглядывает во все значения, которые передают его и печатает.
  • forEach потребляет и печатает все, что s4 переходит к нему.

Важно то, что s2 не знает о том, что оставшийся конвейер пропускает любые значения. s2 заглядывает во все значения независимо от того, что происходит потом.

Другой пример:

Рассмотрим этот конвейер, который указан в этом сообщении в блоге

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .distinct()
         .limit(10)
         .forEach(System.out::println);

Когда вы выполните вышеуказанное, программа никогда не остановится. Зачем? Потому что:

IntStream i1 = IntStream.iterate(0, i -> ( i + 1 ) % 2);
IntStream i2 = i1.distinct();
IntStream i3 = i2.limit(10);

i3.forEach(System.out::println);

Это означает:

  • i1 генерирует бесконечное количество переменных значений: 0, 1, 0, 1, 0, 1,...
  • i2 потребляет все значения, которые были встречены ранее, передавая только "новые" значения, то есть есть всего 2 значения, выходящих из i2.
  • i3 передает 10 значений, затем останавливается.

Этот алгоритм никогда не будет останавливаться, потому что i3 ждет i2 для получения еще 8 значений после 0 и 1, но эти значения никогда не появляются, а i1 никогда не перестает подавать значения до i2.

Не имеет значения, что в какой-то момент в трубопроводе было произведено более 10 значений. Все, что имеет значение, состоит в том, что i3 никогда не видел этих 10 значений.

Чтобы ответить на ваш вопрос:

Это просто, что "каждое действие перед пропуском выполняется, пока не все до предела"?

Неа. Все операции перед тем, как выполняется skip() или limit(). В обоих ваших исполнениях вы получите A1 - A3. Но limit() может привести к короткому замыканию конвейера, прервав потребление стоимости после того, как произойдет интересное событие (достигнут предел).

Ответ 3

Это полное богохульство, чтобы смотреть на паровые операции индивидуально, потому что это не то, как оценивается поток.

Говоря о предел (3), это операция короткого замыкания, что имеет смысл, потому что, думая об этом, любая операция до и после strong > limit, имеющий ограничение в потоке, остановит итерацию после получения n элементов до предельной операции, но это не означает, что будут обрабатываться только n элементов потока. Возьмите эту другую операцию потока для примера

public class App 
{
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .filter(x -> x%2==0)
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

выводит

A1
A2B2C2
A3
A4B4C4
A5
A6B6C6

которые кажутся правильными, поскольку предел ожидает, что три элемента потока пройдут через цепочку операций, хотя обрабатываются 6 элементов потока.

Ответ 4

Все потоки основаны на разделителях, которые имеют в основном две операции: продвижение (перемещение вперед одного элемента, похожее на итератор) и разделение (деление на произвольное положение, подходящее для параллельной обработки). Вы можете прекратить принимать входные элементы в любой момент, который вам нравится (это делается с помощью limit), но вы не можете просто перейти к произвольной позиции (там нет такой операции в интерфейсе Spliterator). Таким образом, операция skip должна действительно читать первые элементы из источника, чтобы игнорировать их. Обратите внимание, что в некоторых случаях вы можете выполнить фактический скачок:

List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9);

list.stream().skip(3)... // will read 1,2,3, but ignore them
list.subList(3, list.size()).stream()... // will actually jump over the first three elements