Подтвердить что ты не робот

Может ли функция объединителя коллектора использоваться в последовательных потоках?

Пример программы:

public final class CollectorTest
{
    private CollectorTest()
    {
    }

    private static <T> BinaryOperator<T> nope()
    {
        return (t, u) -> { throw new UnsupportedOperationException("nope"); };
    }

    public static void main(final String... args)
    {
        final Collector<Integer, ?, List<Integer>> c
            = Collector.of(ArrayList::new, List::add, nope());

        IntStream.range(0, 10_000_000).boxed().collect(c);
    }
}

Итак, для упрощения здесь нет окончательного преобразования, поэтому полученный код довольно прост.

Теперь IntStream.range() создает последовательный поток. Я просто помещаю результаты в Integer, а затем мои надуманные Collector собирают их в List<Integer>. Довольно просто.

И независимо от того, сколько раз я запускаю эту примерную программу, UnsupportedOperationException никогда не попадает, что означает, что мой фиктивный комбайнер никогда не вызывается.

Я как бы ожидал этого, но потом я уже неправильно понял потоки, чтобы задать вопрос...

Может ли когда-нибудь вызываться комбайнер Collector, когда поток гарантированно будет последовательным?

4b9b3361

Ответ 1

Тщательное чтение кода реализации потоков в ReduceOps.java показывает, что функция объединения вызывается только тогда, когда завершается ReduceTask, и ReduceTask экземпляры используются только при параллельной оценке конвейера. Таким образом, в текущей реализации комбайнер никогда не вызывается при оценке последовательного конвейера.

В спецификации нет ничего, что бы гарантировать это. A Collector - это интерфейс, который создает требования к его реализациям, и нет исключений, предоставляемых для последовательных потоков. Лично мне трудно представить, почему последовательная оценка конвейера может потребоваться вызвать комбайнера, но кто-то с большим воображением, чем я, может найти для этого разумное применение и реализовать его. Спецификация позволяет это, и хотя сегодня реализация не делает этого, вам все равно придется подумать об этом.

Это не должно удивлять. Проектный центр API потоков поддерживает параллельное выполнение на равных началах с последовательным выполнением. Конечно, программа может наблюдать, будет ли она выполняться последовательно или параллельно. Но дизайн API - это поддержка стиля программирования, который позволяет либо.

Если вы пишете коллекционер, и вы обнаружите, что невозможно (или неудобно или сложно) написать функцию ассоциативного объединителя, что побуждает вас ограничивать ваш поток последовательным исполнением, возможно, это означает, что вы направляетесь в неправильное направление. Пришло время немного отступить и подумать о том, чтобы подойти к проблеме по-другому.

Обычная операция стиля сокращения, которая не требует функции ассоциативного сумматора, называется fold-left. Основная характеристика заключается в том, что функция сгиба применяется строго слева направо, исходя из одного за раз. Я не знаю, как можно распараллелить fold-left.

Когда люди пытаются объединить коллекционеры так, как мы говорили, они обычно ищут что-то вроде фолд-левого. API Streams API не имеет прямой поддержки API для этой операции, но довольно легко писать. Например, предположим, что вы хотите уменьшить список строк, используя эту операцию: повторите первую строку, а затем добавьте вторую. Довольно легко показать, что эта операция не является ассоциативной:

List<String> list = Arrays.asList("a", "b", "c", "d", "e");

System.out.println(list.stream()
    .collect(StringBuilder::new,
             (a, b) -> a.append(a.toString()).append(b),
             (a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE

Выполняется последовательно, это дает желаемый результат:

aabaabcaabaabcdaabaabcaabaabcde

Но при параллельном запуске может возникнуть нечто вроде этого:

aabaabccdde

Так как он "работает" последовательно, мы можем обеспечить его выполнение, вызвав sequential() и вернемся к нему, если комбайнер выдаст исключение. Кроме того, поставщик должен вызываться ровно один раз. Невозможно объединить промежуточные результаты, поэтому, если поставщик вызывается дважды, у нас уже проблемы. Но поскольку мы "знаем", поставщик вызывается только один раз в последовательном режиме, большинство людей не беспокоится об этом. Фактически, я видел, как люди пишут "поставщики", которые возвращают какой-то существующий объект вместо создания нового, в нарушение контракта поставщика.

При этом использовании формы 3-arg collect() у нас есть две из трех функций, нарушающих их контракты. Разве это не должно говорить нам делать что-то по-другому?

Основная работа здесь выполняется с помощью функции аккумулятора. Чтобы выполнить уменьшение стиля сгиба, мы можем применить эту функцию в строгом порядке слева направо, используя forEachOrdered(). Мы должны сделать немного настройки и завершения кода до и после, но это не проблема:

StringBuilder a = new StringBuilder();
list.parallelStream()
    .forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());

Естественно, это работает отлично параллельно, хотя преимущества параллельной работы могут быть несколько сведены на нет требованиями порядка forEachOrdered().

В общем, если вы хотите, чтобы вы выполняли изменчивое сокращение, но вам не хватает функции ассоциативного объединителя, что позволяет ограничить поток последовательным выполнением, переделать проблему как операцию с левой стороны и использовать forEachRemaining() на вашей функции аккумулятора.

Ответ 2

Как отмечалось в предыдущих комментариях @MarkoTopolnik и @Duncan, нет гарантии, что Collector.combiner() для последовательного режима вызывается для создания уменьшенного результата. Фактически, Java-документ немного субъективен в этом вопросе, что может привести к не соответствующей интерпретации.

(...) Параллельная реализация будет разбивать входные данные, создавать контейнер результатов для каждого раздела, накапливать содержимое каждого раздела в подзадачу для этого раздела, а затем использовать функцию объединителя для объединения подвыборов в комбинированный результат.

В соответствии с Комбинатор NoBlogDefFound используется только в параллельном режиме. См. Частичную цитату ниже:

combiner() используется для объединения двух аккумуляторов в один. Он используется, когда коллектор выполняется параллельно, сначала разбивая входной поток и собирающие части.

Чтобы показать более ясную эту проблему, я переписываю первый код и ставил два подхода (последовательный и параллельный).


public final class CollectorTest
{
    private CollectorTest()
    {
    }

    private static <T> BinaryOperator<T> nope()
    {
        return (t, u) -> { throw new UnsupportedOperationException("nope"); };
    }

    public static void main(final String... args)
    {

        final Collector<Integer, ?, List<Integer>> c =
                Collector
                    .of(ArrayList::new, List::add, nope());

        // approach sequential
        Stream<Integer> sequential = IntStream
                .range(0, 10_000_000)
                .boxed();

        System.out.println("isParallel:" + sequential.isParallel());
        sequential
                .collect(c);

        // approach parallel
        Stream<Integer> parallel = IntStream
                .range(0, 10_000_000)
                .parallel()
                .boxed();

        System.out.println("isParallel:" + parallel.isParallel());
        parallel
                .collect(c);
    }
}

После запуска этого кода мы можем получить результат:

isParallel:false
isParallel:true
Exception in thread "main" java.lang.UnsupportedOperationException: nope
    at com.stackoverflow.lambda.CollectorTest.lambda$nope$0(CollectorTest.java:18)
    at com.stackoverflow.lambda.CollectorTest$$Lambda$3/2001049719.apply(Unknown Source)
    at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:174)
    at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:160)

Итак, в соответствии с этим результатом мы можем заключить, что Collector combiner можно вызывать только путем параллельного выполнения.