Подтвердить что ты не робот

Различные результаты суммирования с Parallel.ForEach

У меня есть цикл foreach, который я распараллеливаю, и я заметил что-то странное. Код выглядит как

double sum = 0.0;

Parallel.ForEach(myCollection, arg =>
{
     sum += ComplicatedFunction(arg);
});

// Use sum variable below

Когда я использую обычный цикл foreach, я получаю разные результаты. Внутри ComplicatedFunction может быть что-то еще глубже, но возможно, что переменная sum подвержена неопределенному влиянию распараллеливания?

4b9b3361

Ответ 1

возможно, что на переменную суммы на неопределенный период влияет распараллеливание?

Да.
Доступ к double не является атомарным, а операция sum += ... никогда не является потокобезопасной, даже для типов, которые являются атомарными. Таким образом, у вас много условий гонки, и результат непредсказуем.

Вы можете использовать что-то вроде:

double sum = myCollection.AsParallel().Sum(arg => ComplicatedFunction(arg));

или, в более короткие обозначения

double sum = myCollection.AsParallel().Sum(ComplicatedFunction);

Ответ 2

Как и другие упомянутые ответы, обновление переменной sum из нескольких потоков (что и делает Parallel.ForEach) не является потокобезопасной операцией. Тривиальное исправление приобретения блокировки перед выполнением обновления устранит эту проблему.

double sum = 0.0;
Parallel.ForEach(myCollection, arg => 
{ 
  lock (myCollection)
  {
    sum += ComplicatedFunction(arg);
  }
});

Однако это порождает еще одну проблему. Поскольку блокировка получена на каждой итерации, это означает, что выполнение каждой итерации будет эффективно сериализовано. Другими словами, было бы лучше просто использовать простой старый цикл foreach.

Теперь трюк в правильном выборе заключается в том, чтобы разделить проблему на отдельные и независимые патроны. К счастью, это очень легко сделать, когда все, что вы хотите сделать, это суммирование результата итераций, потому что операция суммирования является коммутативной и ассоциативной, а промежуточные результаты итераций независимы.

Итак, вот как вы это делаете.

double sum = 0.0;
Parallel.ForEach(myCollection,
    () => // Initializer
    {
        return 0D;
    },
    (item, state, subtotal) => // Loop body
    {
        return subtotal += ComplicatedFunction(item);
    },
    (subtotal) => // Accumulator
    {
        lock (myCollection)
        {
          sum += subtotal;
        }
    });

Ответ 3

Если вы думаете об этом sum += ComplicatedFunction как фактически состоящем из нескольких операций, скажите:

r1 <- Load current value of sum
r2 <- ComplicatedFunction(...)
r1 <- r1 + r2

Итак, теперь мы случайным образом чередуем два (или более) параллельных экземпляра этого. Один поток может содержать устаревшее "старое значение" суммы, которое он использует для выполнения своего вычисления, результат которого он записывает поверх некоторой модифицированной версии суммы. Это классическое состояние гонки, потому что некоторые результаты теряются недетерминированным способом, основываясь на том, как выполняется перемежение.

Ответ 4

Или вы можете использовать параллельные операции агрегирования, как это определено в .Net. Вот код

        object locker = new object();
        double sum= 0.0;
        Parallel.ForEach(mArray,
                        () => 0.0,                 // Initialize the local value.
                        (i, state, localResult) => localResult + ComplicatedFunction(i), localTotal =>   // Body delegate which returns the new local total.                                                                                                                                           // Add the local value
                            {
                                lock (locker) sum4+= localTotal;
                            }    // to the master value.
                        );