Подтвердить что ты не робот

Java Stream API: почему различие между последовательным и параллельным режимом выполнения?

Из поток javadoc:

Потоковые конвейеры могут выполняться последовательно или параллельно. Этот режим выполнения является свойством потока. Потоки создаются с первоначальным выбором последовательного или параллельного выполнения.

Мои предположения:

  • Функциональная разница между последовательными/параллельными потоками отсутствует. На выход не влияет режим выполнения.
  • Параллельный поток всегда предпочтительнее, учитывая соответствующее количество ядер и размер проблем, чтобы оправдать накладные расходы из-за повышения производительности.
  • Мы хотим написать код один раз и работать где угодно, не заботясь об оборудовании (это, в конце концов, Java).

Предполагая, что эти допущения действительны (ничего неправильного в части мета-допущения), , какое значение имеет режим выполнения, открытый в api?

Кажется, вы должны просто объявить Stream, и выбор последовательного/параллельного выполнения должен обрабатываться автоматически в слое ниже, либо с помощью кода библиотеки, либо самого JVM как функции доступных ядер во время выполнения, размер проблемы и т.д.

Конечно, предполагая, что параллельные потоки также работают на одноядерной машине, возможно, только при использовании параллельного потока это достигается. Но это действительно уродливо - почему есть явные ссылки на параллельные потоки в моем коде, когда это опция по умолчанию?

Даже если есть сценарий, в котором вы намеренно захотите жестко закодировать использование последовательного потока - почему для этой цели существует не только под-интерфейс SequentialStream, а не загрязнение Stream выполнением переключатель режима?

4b9b3361

Ответ 1

Похоже, вы должны просто объявить поток, и выбор последовательного/параллельного выполнения должен обрабатываться автоматически в слое ниже, либо с помощью кода библиотеки, либо самого JVM как функции от ядер, доступных во время выполнения, размер проблемы и т.д.

Реальность такова: a) потоки являются библиотекой и не имеют специальной магии JVM, и б) вы не можете создать достаточно интеллектуальную библиотеку, чтобы автоматически определить, какое правильное решение в этом конкретном случае. Нет никакого разумного способа оценить, насколько дорогостоящей будет какая-то конкретная функция, даже если вы могли бы инспектировать ее реализацию, чего вы не можете - и теперь вы вводите ориентир в каждую операцию потока, пытаясь выяснить если распараллеливание будет стоить затрат на parallelism. Это просто не практично, особенно учитывая, что вы заранее не знаете, насколько плохи служебные данные parallelism.

Параллельный поток всегда предпочтительнее, учитывая соответствующее количество ядер и размер проблемы, чтобы оправдать накладные расходы из-за повышения производительности.

Не всегда, на практике. Некоторые задачи настолько малы, что их не стоит распараллеливать, а parallelism всегда имеет некоторые накладные расходы. (И, честно говоря, большинство программистов склонны переоценивать полезность parallelism, ударяя его повсюду, когда это действительно вредит производительности.)

В принципе, это довольно сложная проблема, которую вы в основном должны выталкивать на программиста.

Ответ 2

Интересный случай в этом вопросе показывает, что иногда параллельный поток может быть медленнее по порядку величины. В этом конкретном примере параллельная версия работает в течение десяти минут, в то время как последовательное занимает несколько секунд.

Ответ 3

Функциональной разницы между последовательной/параллельной потоки. Режим вывода не влияет на выход.

Существует различие между выполнением последовательных/параллельных потоков. В приведенном ниже коде TEST_2 результаты показывают, что параллельное выполнение потока выполняется намного быстрее, чем последовательный путь.

Параллельный поток всегда предпочтительнее, учитывая соответствующее количество ядер и проблему чтобы оправдать накладные расходы из-за повышения производительности.

Не совсем. если задача не является достойной (простые задачи), выполняемой в параллельных потоках, то просто мы добавляем накладные расходы на наш код. Результаты TEST_1 показывают это. Также обратите внимание, что если все рабочие потоки заняты одной задачей параллельного выполнения; то другая параллельная работа потока в другом месте вашего кода будет ждать этого.

Мы хотим писать код один раз и запускать в любом месте, не заботясь о (это Java, в конце концов).

Поскольку только программист знает; стоит ли выполнять эту задачу параллельно/последовательно, независимо от процессора. Таким образом, java API обнаружил обе опции для разработчика.

import java.util.ArrayList;
import java.util.List;

/*
 * Performance test over internal(parallel/sequential) and external iterations.
 * https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
 * 
 * 
 * Parallel computing involves dividing a problem into subproblems, 
 * solving those problems simultaneously (in parallel, with each subproblem running in a separate thread),
 *  and then combining the results of the solutions to the subproblems. Java SE provides the fork/join framework, 
 *  which enables you to more easily implement parallel computing in your applications. However, with this framework, 
 *  you must specify how the problems are subdivided (partitioned). 
 *  With aggregate operations, the Java runtime performs this partitioning and combining of solutions for you.
 * 
 * Limit the parallelism that the ForkJoinPool offers you. You can do it yourself by supplying the -Djava.util.concurrent.ForkJoinPool.common.parallelism=1,
 *  so that the pool size is limited to one and no gain from parallelization
 *  
 *  @see ForkJoinPool
 *  https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
 *  
 *  ForkJoinPool, that pool creates a fixed number of threads (default: number of cores) and 
 *  will never create more threads (unless the application indicates a need for those by using managedBlock).
 *   *  http://stackoverflow.com/questions/10797568/what-determines-the-number-of-threads-a-java-forkjoinpool-creates
 *  
 */
public class IterationThroughStream {
    private static boolean found = false;
    private static List<Integer> smallListOfNumbers = null;
    public static void main(String[] args) throws InterruptedException {


        // TEST_1
        List<String> bigListOfStrings = new ArrayList<String>();
        for(Long i = 1l; i <= 1000000l; i++) {
            bigListOfStrings.add("Counter no: "+ i);
        }

        System.out.println("Test Start");
        System.out.println("-----------");
        long startExternalIteration = System.currentTimeMillis();
        externalIteration(bigListOfStrings);
        long endExternalIteration = System.currentTimeMillis();
        System.out.println("Time taken for externalIteration(bigListOfStrings) is :" + (endExternalIteration - startExternalIteration) + " , and the result found: "+ found);

        long startInternalIteration = System.currentTimeMillis();
        internalIteration(bigListOfStrings);
        long endInternalIteration = System.currentTimeMillis();
        System.out.println("Time taken for internalIteration(bigListOfStrings) is :" + (endInternalIteration - startInternalIteration) + " , and the result found: "+ found);





        // TEST_2
        smallListOfNumbers = new ArrayList<Integer>();
        for(int i = 1; i <= 10; i++) {
            smallListOfNumbers.add(i);
        }

        long startExternalIteration1 = System.currentTimeMillis();
        externalIterationOnSleep(smallListOfNumbers);
        long endExternalIteration1 = System.currentTimeMillis();
        System.out.println("Time taken for externalIterationOnSleep(smallListOfNumbers) is :" + (endExternalIteration1 - startExternalIteration1));

        long startInternalIteration1 = System.currentTimeMillis();
        internalIterationOnSleep(smallListOfNumbers);
        long endInternalIteration1 = System.currentTimeMillis();
        System.out.println("Time taken for internalIterationOnSleep(smallListOfNumbers) is :" + (endInternalIteration1 - startInternalIteration1));




        // TEST_3
        Thread t1 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t2 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t3 = new Thread(IterationThroughStream :: internalIterationOnThread);
        Thread t4 = new Thread(IterationThroughStream :: internalIterationOnThread);

        t1.start();
        t2.start();
        t3.start();
        t4.start();

        Thread.sleep(30000);
    }


    private static boolean externalIteration(List<String> bigListOfStrings) {
        found = false;
        for(String s : bigListOfStrings) {
            if(s.equals("Counter no: 1000000")) {
                found = true;
            }
        }
        return found;
    }

    private static boolean internalIteration(List<String> bigListOfStrings) {
        found = false;
        bigListOfStrings.parallelStream().forEach(
                (String s) -> { 
                    if(s.equals("Counter no: 1000000")){  //Have a breakpoint to look how many threads are spawned.
                        found = true;
                    }

                }
            );
        return found;       
    }


    private static boolean externalIterationOnSleep(List<Integer> smallListOfNumbers) {
        found = false;
        for(Integer s : smallListOfNumbers) {
            try {
                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return found;
    }

    private static boolean internalIterationOnSleep(List<Integer> smallListOfNumbers) {
        found = false;
        smallListOfNumbers.parallelStream().forEach( //Removing parallelStream() will behave as single threaded (sequential access).
                (Integer s) -> {
                    try {
                        Thread.sleep(100); //Have a breakpoint to look how many threads are spawned.
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            );
        return found;       
    }

    public static void internalIterationOnThread() {
        smallListOfNumbers.parallelStream().forEach(
                (Integer s) -> {
                    try {
                        /*
                         * DANGEROUS
                         * This will tell you that if all the 7 FJP(Fork join pool) worker threads are blocked for one single thread (e.g. t1), 
                         * then other normal three(t2 - t4) thread wont execute, will wait for FJP worker threads. 
                         */
                        Thread.sleep(100); //Have a breakpoint here.
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            );
    }
}

Ответ 4

Кажется, что вы должны просто иметь возможность объявлять поток, и выбор последовательного/параллельного выполнения должен выполняться автоматически на следующем уровне, либо с помощью библиотечного кода, либо самой JVM как функции ядер, доступных во время выполнения, размера проблемы и т.д.

Чтобы добавить к уже предоставленным ответам:

Это довольно смелое предположение. Представьте себе, что вы моделируете настольную игру для обучения некоторой форме ИИ, довольно легко распараллелить выполнение различных прохождений - просто создайте новый экземпляр и дайте ему работать в своем собственном потоке. Так как он не имеет общего состояния с другим прохождением, вам даже не нужно учитывать многопоточность в игровой логике. С другой стороны, если вы распараллелите саму игровую логику, вы получите всевозможные проблемы с многопоточностью и, скорее всего, заплатите высокую цену за сложность и даже производительность.

Контроль над поведением потоков дает вам (соответственно ограниченную) гибкость, которая сама по себе является ключевой характеристикой для хорошего дизайна библиотеки.