Подтвердить что ты не робот

Параллельный поток и последовательный поток

Возможно ли, что параллельный поток может дать другой результат, чем последовательный поток в Java 8? Согласно моей информации, параллельный поток такой же, как и последовательный поток, за исключением того, что он разделен на несколько подпотоков. Речь идет о скорости. Все операции над элементами выполняются, и результаты подпотоков объединяются в конце. В конце концов, результаты операций должны быть одинаковыми для параллельных и последовательных потоков, на мой взгляд. Поэтому мой вопрос: возможно ли, что этот код может дать мне другой результат? И если это возможно, почему это происходит?

int[] i = {1, 2, 5, 10, 9, 7, 25, 24, 26, 34, 21, 23, 23, 25, 27, 852, 654, 25, 58};
Double serial = Arrays.stream(i).filter(si -> {
    return si > 5;
}).mapToDouble(Double::new).map(NewClass::add).reduce(Math::atan2).getAsDouble();

Double parallel = Arrays.stream(i).filter(si -> {
    return si > 5;
}).parallel().mapToDouble(Double::new).map(NewClass::add).reduce(Math::atan2).getAsDouble();

System.out.println("serial: " + serial);
System.out.println("parallel: " + parallel);

public static double add(double i) {
    return i + 0.005;
}

и результаты:

serial: 3.6971567726175894E-23

parallel: 0.779264049587662
4b9b3361

Ответ 1

В javadoc для reduce() говорится:

Выполняет сокращение элементов этого потока с помощью функции накопления ассоциативной, [...] Функция аккумулятора должна быть ассоциативной функцией.

Слово "ассоциативный" связано с этим java-документом:

Оператор или функция op ассоциативна, если выполнено:

 (a op b) op c == a op (b op c)

Важность этого для параллельной оценки можно увидеть, если мы разложим это на четыре члена:

 a op b op c op d == (a op b) op (c op d)

Итак, мы можем оценить (a op b) параллельно с (c op d), а затем вызвать op для результатов.

Примеры ассоциативных операций включают числовое сложение, min и max и конкатенацию строк.

Как упоминается в комментарии @PaulBoddington, atan2 не является ассоциативным и поэтому недействителен для операции сокращения.


Несвязанный

Ваша последовательность потоков немного выключена. Вы должны фильтровать после параллельной операции, лямбда может быть сокращена, и вы не должны вставлять двойной:

double parallel = Arrays.stream(i)
                        .parallel()           // <-- before filter
                        .filter(si -> si > 5) // <-- shorter
                        .asDoubleStream()     // <-- not boxing
                        .reduce(Math::atan2)
                        .getAsDouble();

Ответ 2

Когда вы используете reduce с параллельным потоком, операции выполняются не в определенном порядке.

Поэтому, если вы хотите, чтобы параллельные потоки приводили к прогнозируемому результату, операция сокращения должна иметь тот же ответ, независимо от того, в каком порядке выполняются действия.

Например, сокращение использования сложения имеет смысл, поскольку добавление ассоциативно. Неважно, что из этого вы делаете, ответ 6 в обоих случаях.

(1 + 2) + 3
1 + (2 + 3)

atan2 не является ассоциативным.

Math.atan2(Math.atan2(1, 2), 3) == 0.15333604941031637

тогда

Math.atan2(1, Math.atan2(2, 3)) == 1.0392451500584097

Ответ 3

Ваш метод уменьшения дает разные результаты, если элементы указаны в разных порядках.

Итак, если вы используете параллельный поток, первоначальный порядок не гарантируется.

Если вы используете другой метод сокращения (например, (x, y) → x + y), он работает нормально.