Подтвердить что ты не робот

Java sum two double [] [] с параллельным потоком

Скажем, у меня есть две матрицы:

double[][] a = new double[2][2]
a[0][0] = 1
a[0][1] = 2
a[1][0] = 3
a[1][1] = 4

double[][] b = new double[2][2]
b[0][0] = 1
b[0][1] = 2
b[1][0] = 3
b[1][1] = 4

традиционным способом, чтобы суммировать эти матрицы, я бы сделал вложенный цикл:

int rows = a.length;
int cols = a[0].length;
double[][] res = new double[rows][cols];
for(int i = 0; i < rows; i++){
    for(int j = 0; j < cols; j++){
        res[i][j] = a[i][j] + b[i][j];
    }
}

Я новичок в потоковом API, но я думаю, что это очень удобно использовать с parallelStream, поэтому мой вопрос в том, есть ли способ сделать это и использовать параллельную обработку?

Изменить: не уверен, что это подходящее место, но здесь мы идем: Используя некоторые предложения, я поместил Stream в тест. Настройка была такой: Классический подход:

public class ClassicMatrix {

    private final double[][] components;
    private final int cols;
    private final int rows;




    public ClassicMatrix(final double[][] components){
    this.components = components;
    this.rows = components.length;
    this.cols = components[0].length;
    }


    public ClassicMatrix addComponents(final ClassicMatrix a) {
    final double[][] res = new double[rows][cols];
    for (int i = 0; i < rows; i++) {
        for (int j = 0; j < rows; j++) {
        res[i][j] = components[i][j] + a.components[i][j];
        }
    }
    return new ClassicMatrix(res);
    }

}

Использование предложения @dkatzel:

public class MatrixStream1 {

    private final double[][] components;
    private final int cols;
    private final int rows;

    public MatrixStream1(final double[][] components){
    this.components = components;
    this.rows = components.length;
    this.cols = components[0].length;
    }

    public MatrixStream1 addComponents(final MatrixStream1 a) {
    final double[][] res = new double[rows][cols];
    IntStream.range(0, rows*cols).parallel().forEach(i -> {
               int x = i/rows;
               int y = i%rows;

               res[x][y] = components[x][y] + a.components[x][y];
           });
    return new MatrixStream1(res);
    }
}

Используя предложение @Eugene:

public class MatrixStream2 {

    private final double[][] components;
    private final int cols;
    private final int rows;

    public MatrixStream2(final double[][] components) {
    this.components = components;
    this.rows = components.length;
    this.cols = components[0].length;
    }

    public MatrixStream2 addComponents(final MatrixStream2 a) {
    final double[][] res = new double[rows][cols];
    IntStream.range(0, rows)
        .forEach(i -> Arrays.parallelSetAll(res[i], j -> components[i][j] * a.components[i][j]));
    return new MatrixStream2(res);
    }
}

и тестовый класс, выполняющий 3 независимых раза по одному для каждого метода (просто заменив имя метода в main()):

public class MatrixTest {

    private final static String path = "/media/manuel/workspace/data/";

    public static void main(String[] args) {
    final List<Double[]> lst = new ArrayList<>();
    for (int i = 100; i < 8000; i = i + 400) {
        final Double[] d = testClassic(i); 
        System.out.println(d[0] + " : " + d[1]);
        lst.add(d);
    }
    IOUtils.saveToFile(path + "classic.csv", lst);
    }

    public static Double[] testClassic(final int i) {

    final ClassicMatrix a = new ClassicMatrix(rand(i));
    final ClassicMatrix b = new ClassicMatrix(rand(i));

    final long start = System.currentTimeMillis();
    final ClassicMatrix mul = a.addComponents(b);
    final long now = System.currentTimeMillis();
    final double elapsed = (now - start);

    return new Double[] { (double) i, elapsed };

    }

    public static Double[] testStream1(final int i) {

    final MatrixStream1 a = new MatrixStream1(rand(i));
    final MatrixStream1 b = new MatrixStream1(rand(i));

    final long start = System.currentTimeMillis();
    final MatrixStream1 mul = a.addComponents(b);
    final long now = System.currentTimeMillis();
    final double elapsed = (now - start);

    return new Double[] { (double) i, elapsed };

    }

    public static Double[] testStream2(final int i) {

    final MatrixStream2 a = new MatrixStream2(rand(i));
    final MatrixStream2 b = new MatrixStream2(rand(i));

    final long start = System.currentTimeMillis();
    final MatrixStream2 mul = a.addComponents(b);
    final long now = System.currentTimeMillis();
    final double elapsed = (now - start);

    return new Double[] { (double) i, elapsed };

    }

    private static double[][] rand(final int size) {
    final double[][] rnd = new double[size][size];
    for (int i = 0; i < size; i++) {
        for (int j = 0; j < size; j++) {
        rnd[i][j] = Math.random();
        }
    }
    return rnd;
    }
}

Результаты:

Classic Matrix size, Time (ms)
100.0,1.0
500.0,5.0
900.0,5.0
1300.0,43.0
1700.0,94.0
2100.0,26.0
2500.0,33.0
2900.0,46.0
3300.0,265.0
3700.0,71.0
4100.0,87.0
4500.0,380.0
4900.0,432.0
5300.0,215.0
5700.0,238.0
6100.0,577.0
6500.0,677.0
6900.0,609.0
7300.0,584.0
7700.0,592.0

Stream1, Time(ms)
100.0,86.0
500.0,13.0
900.0,9.0
1300.0,47.0
1700.0,92.0
2100.0,29.0
2500.0,33.0
2900.0,46.0
3300.0,253.0
3700.0,71.0
4100.0,90.0
4500.0,352.0
4900.0,373.0
5300.0,497.0
5700.0,485.0
6100.0,579.0
6500.0,711.0
6900.0,800.0
7300.0,780.0
7700.0,902.0

Stream2, Time(ms)
100.0,111.0
500.0,42.0
900.0,12.0
1300.0,54.0
1700.0,97.0
2100.0,110.0
2500.0,177.0
2900.0,71.0
3300.0,250.0
3700.0,106.0
4100.0,359.0
4500.0,143.0
4900.0,233.0
5300.0,261.0
5700.0,289.0
6100.0,406.0
6500.0,814.0
6900.0,830.0
7300.0,828.0
7700.0,911.0

Я сделал заговор для лучшего сравнения: Тест производительности

Нет никакого улучшения. Где ошибка? Являются ли матрицы малыми (7700 x 7700)? Более того, это взрывает память моего компьютера.

4b9b3361

Ответ 1

Один из способов сделать это можно с помощью Arrays.parallelSetAll:

int rows = a.length;
int cols = a[0].length;
double[][] res = new double[rows][cols];

Arrays.parallelSetAll(res, i -> {
    Arrays.parallelSetAll(res[i], j -> a[i][j] + b[i][j]);
    return res[i];
});

Я не уверен на 100%, но я думаю, что внутренний вызов Arrays.parallelSetAll может не стоить накладных расходов на создание внутренней распараллеливания для каждого столбца строки. Возможно, этого достаточно, чтобы распараллелить сумму только для каждой строки:

Arrays.parallelSetAll(res, i -> {
    Arrays.setAll(res[i], j -> a[i][j] + b[i][j]);
    return res[i];
});

В любом случае, вы должны тщательно измерить, прежде чем добавлять параллелизм в алгоритм, потому что много раз накладные расходы настолько велики, что его не стоит использовать.

Ответ 2

Это еще не измерено (я немного позже), но не должно ли уже построить в Arrays.parallelSetAll выполнить работу самым быстрым способом?

    for (int i = 0; i < a.length; ++i) {
        int j = i;
        Arrays.parallelSetAll(r[j], x -> a[j][x] + b[j][x]);
    }

Или даже приятнее:

IntStream.range(0, a.length)
         .forEach(i -> Arrays.parallelSetAll(r[i], j -> a[i][j] + b[i][j]));

Это очень хорошо сочетается с кэшами процессора, так как вероятность того, что следующая запись находится в одной и той же строке кэша, велика. Выполнение чтения в обратном порядке (столбцы и строки) будет рассеивать чтение по всему месту.

Я поставил jmh test здесь. Обратите внимание, что Федерико ответ является самым быстрым. Подумайте о своей идее.

Вот результаты:

Benchmark                 (howManyEntries)  Mode  Cnt    Score    Error  Units
DoubleArraySum.dkatzel                 100  avgt   10    0.055 ±  0.005  ms/op
DoubleArraySum.dkatzel                 500  avgt   10    0.997 ±  0.156  ms/op
DoubleArraySum.dkatzel                1000  avgt   10    4.162 ±  0.368  ms/op
DoubleArraySum.dkatzel                3000  avgt   10   39.619 ±  4.391  ms/op
DoubleArraySum.dkatzel                8000  avgt   10  236.468 ± 41.599  ms/op
DoubleArraySum.eugene                  100  avgt   10    0.671 ±  0.187  ms/op
DoubleArraySum.eugene                  500  avgt   10    6.317 ±  0.268  ms/op
DoubleArraySum.eugene                 1000  avgt   10   14.751 ±  0.676  ms/op
DoubleArraySum.eugene                 3000  avgt   10   65.174 ±  6.044  ms/op
DoubleArraySum.eugene                 8000  avgt   10  285.571 ± 23.206  ms/op
DoubleArraySum.federico1               100  avgt   10    0.169 ±  0.010  ms/op
DoubleArraySum.federico1               500  avgt   10    1.999 ±  0.217  ms/op
DoubleArraySum.federico1              1000  avgt   10    6.087 ±  1.108  ms/op
DoubleArraySum.federico1              3000  avgt   10   40.825 ±  4.853  ms/op
DoubleArraySum.federico1              8000  avgt   10  267.446 ± 37.490  ms/op
DoubleArraySum.federico2               100  avgt   10    0.034 ±  0.003  ms/op
DoubleArraySum.federico2               500  avgt   10    0.974 ±  0.152  ms/op
DoubleArraySum.federico2              1000  avgt   10    3.245 ±  0.080  ms/op
DoubleArraySum.federico2              3000  avgt   10   30.503 ±  5.960  ms/op
DoubleArraySum.federico2              8000  avgt   10  183.183 ± 21.861  ms/op
DoubleArraySum.holijava                100  avgt   10    0.063 ±  0.002  ms/op
DoubleArraySum.holijava                500  avgt   10    1.112 ±  0.020  ms/op
DoubleArraySum.holijava               1000  avgt   10    4.138 ±  0.062  ms/op
DoubleArraySum.holijava               3000  avgt   10   41.784 ±  1.029  ms/op
DoubleArraySum.holijava               8000  avgt   10  266.590 ±  4.080  ms/op
DoubleArraySum.pivovarit               100  avgt   10    0.112 ±  0.002  ms/op
DoubleArraySum.pivovarit               500  avgt   10    2.427 ±  0.075  ms/op
DoubleArraySum.pivovarit              1000  avgt   10    9.572 ±  0.355  ms/op
DoubleArraySum.pivovarit              3000  avgt   10   84.413 ±  2.197  ms/op
DoubleArraySum.pivovarit              8000  avgt   10  690.942 ± 34.993  ms/op

ИЗМЕНИТЬ

здесь более читаемый вывод (federico выигрывает со всеми входами)

100=[federico2, dkatzel, holijava, pivovarit, federico1, eugene]
500=[federico2, dkatzel, holijava, federico1, pivovarit, eugene]
1000=[federico2, holijava, dkatzel, federico1, pivovarit, eugene]
3000=[federico2, dkatzel, federico1, holijava, eugene, pivovarit]
8000=[federico2, dkatzel, holijava, federico1, eugene, pivovarit]

Ответ 3

Единственная опция, которую я вижу здесь, - это больше/меньше генерировать все возможные пары индексов, а затем извлекать элементы и применять суммирование. Использование параллельных потоков не будет иметь никакого дополнительного положительного эффекта здесь с таким небольшим примером, но вы можете с уверенностью использовать Stream API здесь (и сразу же конвертировать в параллель), хотя результат не так хорош, как ожидалось:

IntStream.range(0, a.length).boxed()
      .flatMap(i -> IntStream.range(0, a[0].length)
        .mapToObj(j -> new AbstractMap.SimpleImmutableEntry<>(i, j)))
      .parallel()
      .forEach(e -> {
          res[e.getKey()][e.getValue()]
            = a[e.getKey()][e.getValue()] + b[e.getKey()][e.getValue()];
      });

Нам нужно ввести посредника (middlepair?), чтобы мы могли распараллелить один Stream, а не играть с распараллеленным вложенным Streams.

Еще один расширенный способ - реализовать собственный пользовательский коллекционер, но в какой-то момент он будет включать в себя вложенную петлю.


Истинную силу Stream API можно наблюдать при попытке суммировать все значения из двух массивов:

Stream.concat(Arrays.stream(a), Arrays.stream(b)).parallel()
      .flatMapToDouble(Arrays::stream)
      .sum();

Ответ 4

Вы можете использовать IntStream для создания потока по числу ячеек в матрице, а затем выполнить некоторую математику для преобразования этого int в местоположение матрицы.

IntStream.range(0, rows*cols)
               .parallel()
               .forEach( i->{
                   int x = i/rows;
                   int y = i%rows;

                   res[x][y] = a[x][y] + b[x][y];
               });

Другие ответы на этот вопрос не только ошибочны (на момент написания этой статьи), но и создают несколько потоков, которые влияют на производительность, а также даже не параллельны.

Как отмечает @Holger, в то время как этот единственный поток может быть проще читать, затраты на производительность делений и модуля будут делать его медленнее, чем поток потоков с добавками только до тех пор, пока не будет много ядер. Я не уверен, сколько потребуется для компенсации

Ответ 5

Как насчет этого?

double[][] res = IntStream.range(0, a.length).parallel()
                          .mapToObj(i -> 
                                  IntStream.range(0, a[i].length)
                                           .mapToDouble(j -> a[i][j] + b[i][j])
                                           .toArray()
                          )
                          .toArray(double[][]::new);

System.out.println(res);
//                  ^--- [[2., 4.], [6., 8.]]