Подтвердить что ты не робот

Parallel.ForEach продолжает генерировать новые потоки

Пока я использовал Parallel.ForEach в своей программе, я обнаружил, что некоторые потоки никогда не заканчивались. Фактически, он продолжал размножать новые темы снова и снова, поведение, которого я не ожидал и определенно не хотел.

Я смог воспроизвести это поведение со следующим кодом, который, как и моя "настоящая" программа, часто использует процессор и память (код .NET 4.0):

public class Node
{
    public Node Previous { get; private set; }

    public Node(Node previous)
    {
        Previous = previous;
    }
}

public class Program
{
    public static void Main(string[] args)
    {
        DateTime startMoment = DateTime.Now;
        int concurrentThreads = 0;

        var jobs = Enumerable.Range(0, 2000);
        Parallel.ForEach(jobs, delegate(int jobNr)
        {
            Interlocked.Increment(ref concurrentThreads);

            int heavyness = jobNr % 9;

            //Give the processor and the garbage collector something to do...
            List<Node> nodes = new List<Node>();
            Node current = null;
            for (int y = 0; y < 1024 * 1024 * heavyness; y++)
            {
                current = new Node(current);
                nodes.Add(current);
            }

            TimeSpan elapsed = DateTime.Now - startMoment;
            int threadsRemaining = Interlocked.Decrement(ref concurrentThreads);
            Console.WriteLine("[{0:mm\\:ss}] Job {1,4} complete. {2} threads remaining.", elapsed, jobNr, threadsRemaining);
        });
    }
}

При запуске на моем четырехъядерном процессоре сначала начинается с 4 одновременных потоков, как и следовало ожидать. Однако со временем создается больше потоков. В конце концов, эта программа затем выдает исключение OutOfMemoryException:

[00:00] Job    0 complete. 3 threads remaining.
[00:01] Job    1 complete. 4 threads remaining.
[00:01] Job    2 complete. 4 threads remaining.
[00:02] Job    3 complete. 4 threads remaining.
[00:05] Job    9 complete. 5 threads remaining.
[00:05] Job    4 complete. 5 threads remaining.
[00:05] Job    5 complete. 5 threads remaining.
[00:05] Job   10 complete. 5 threads remaining.
[00:08] Job   11 complete. 5 threads remaining.
[00:08] Job    6 complete. 5 threads remaining.
...
[00:55] Job   67 complete. 7 threads remaining.
[00:56] Job   81 complete. 8 threads remaining.
...
[01:54] Job  107 complete. 11 threads remaining.
[02:00] Job  121 complete. 12 threads remaining.
..
[02:55] Job  115 complete. 19 threads remaining.
[03:02] Job  166 complete. 21 threads remaining.
...
[03:41] Job  113 complete. 28 threads remaining.
<OutOfMemoryException>

График использования памяти для эксперимента выше выглядит следующим образом:

Processor and memory usage

(Снимок экрана на голландском языке, верхняя часть - использование процессора, использование памяти нижней части). Как вы можете видеть, похоже, что новый поток генерируется почти каждый раз, когда сборщик мусора мешает (как можно увидеть в результате падения памяти).

Может кто-нибудь объяснить, почему это происходит, и что я могу с этим поделать? Я просто хочу, чтобы .NET прекратила создавать новые потоки и сначала закончила существующие потоки...

4b9b3361

Ответ 1

Вы можете ограничить максимальное количество создаваемых потоков, указав экземпляр ParallelOptions с набором свойств MaxDegreeOfParallelism:

var jobs = Enumerable.Range(0, 2000);
ParallelOptions po = new ParallelOptions
{ 
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(jobs, po, jobNr =>
{
    // ...
});

Что касается того, почему вы наблюдаете поведение, которое вы наблюдаете: TPL (который лежит в основе PLINQ) по умолчанию позволяет угадать оптимальное количество потоков для использования. Всякий раз, когда параллельная задача блокируется, планировщик задач может создать новый поток, чтобы поддерживать прогресс. В вашем случае блокировка может происходить неявно; например, через вызов Console.WriteLine или (как вы заметили) во время сбора мусора.

От Concurrency Настройка уровней с параллельной библиотекой задач (Сколько потоков необходимо использовать?):

Так как политика по умолчанию TPL заключается в использовании одного потока на процессор, мы можем заключить, что TPL изначально предполагает, что рабочая нагрузка задачи составляет ~ 100%, и 0% ожидания, и если исходное предположение терпит неудачу, и задача входит в состояние ожидания (т.е. начинается блокировка) - TPL с освобождением, чтобы добавить потоки соответственно.

Ответ 2

Вероятно, вы должны немного прочитать о том, как работает планировщик задач.

http://msdn.microsoft.com/en-us/library/ff963549.aspx (последняя половина страницы)

"Пул потоков .NET автоматически управляет числом рабочих потоки в бассейне. Он добавляет и удаляет потоки в соответствии со встроенным эвристики. Пул потоков .NET имеет два основных механизма для инъекций threads: механизм предотвращения голодания, который добавляет рабочие потоки, если он не видит прогресса в отношении поставленных в очередь предметов и подъема холма эвристика, которая пытается максимизировать пропускную способность при использовании как нескольких потоков насколько это возможно.

Цель предотвращения голода заключается в предотвращении взаимоблокировки. Этот вид взаимоблокировка может произойти, когда рабочий поток ожидает синхронизации событие, которое может быть удовлетворено только элементом работы, который все еще находится на рассмотрении в глобальном или локальном очередях пула потоков. Если бы было зафиксировано количество рабочих потоков, и все эти потоки были одинаково заблокирован, система не сможет добиться дальнейшего прогресса. Добавление нового рабочего потока устраняет проблему.

Цель эвристики восхождения на холм - улучшить использование когда потоки блокируются вводом-выводом или другими условиями ожидания, которые остановите процессор. По умолчанию пул управляемых потоков имеет один рабочий поток на ядро. Если один из этих рабочих потоков становится заблокирован, есть вероятность того, что ядро ​​может быть недостаточно использовано, в зависимости от общей рабочей нагрузки компьютера. Впрыск нити логика не различает поток, который заблокирован и поток что обеспечивает длительную, интенсивную работу процессора. Следовательно, всякий раз, когда глобальные или локальные очереди пула потоков содержат отложенную работу элементы, активные рабочие элементы, которые занимают много времени (больше, чем половина секунды) может инициировать создание нового сотрудника пула потоков нити".

Вы можете отметить задачу как LongRunning, но это имеет побочный эффект выделения потока для него из-за пула потоков, что означает, что задача не может быть встроена.

Помните, что ParallelFor рассматривает работу, заданную как блоки, поэтому, даже если работа в одном цикле довольно мала, общая работа, выполняемая задачей, вызываемой просмотром, может оказаться более длинной для планировщика.

Большинство вызовов GC в и из них сами по себе не блокируются (он работает в отдельном потоке), но если вы дождитесь завершения GC, то это блокирует. Помните также, что GC переупорядочивает память, так что это может иметь некоторые побочные эффекты (и блокирование), если вы пытаетесь выделить память при запуске GC. По этой причине у меня нет особенностей, но я знаю, что PPL имеет некоторые функции выделения памяти специально для параллельного управления памятью по этой причине.

Глядя на ваш вывод кода, кажется, что все работает в течение многих секунд. Поэтому я не удивлен тем, что вы видите поток инъекций. Однако, похоже, я помню, что размер пула потоков по умолчанию составляет примерно 30 потоков (возможно, в зависимости от количества ядер в вашей системе). Поток занимает примерно до МБ памяти, прежде чем ваш код выделяет больше, поэтому я не понимаю, почему здесь можно получить исключение из памяти.

Ответ 3

Я опубликовал следующий вопрос Как подсчитать количество параллельных потоков в приложении .NET?

Если подсчитывать потоки напрямую, их число в Parallel.For() в основном ((очень редко и незначительно уменьшается) только увеличивается и не восстанавливается после завершения цикла.

Проверено это как в режиме Release, так и Debug, с

ParallelOptions po = new ParallelOptions
{
  MaxDegreeOfParallelism = Environment.ProcessorCount
};

и без

Цифры меняются, но выводы одинаковы.

Вот готовый код, который я использовал, если кто-то хочет играть с:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Edit4Posting
{
public class Node
{

  public Node Previous { get; private set; }
  public Node(Node previous)
  {
    Previous = previous;
    }
  }
  public class Edit4Posting
  {

    public static void Main(string[] args)
    {
      int concurrentThreads = 0;
      int directThreadsCount = 0;
      int diagThreadCount = 0;

      var jobs = Enumerable.Range(0, 160);
      ParallelOptions po = new ParallelOptions
      {
        MaxDegreeOfParallelism = Environment.ProcessorCount
      };
      Parallel.ForEach(jobs, po, delegate(int jobNr)
      //Parallel.ForEach(jobs, delegate(int jobNr)
      {
        int threadsRemaining = Interlocked.Increment(ref concurrentThreads);

        int heavyness = jobNr % 9;

        //Give the processor and the garbage collector something to do...
        List<Node> nodes = new List<Node>();
        Node current = null;
        //for (int y = 0; y < 1024 * 1024 * heavyness; y++)
        for (int y = 0; y < 1024 * 24 * heavyness; y++)
        {
          current = new Node(current);
          nodes.Add(current);
        }
        //*******************************
        directThreadsCount = Process.GetCurrentProcess().Threads.Count;
        //*******************************
        threadsRemaining = Interlocked.Decrement(ref concurrentThreads);
        Console.WriteLine("[Job {0} complete. {1} threads remaining but directThreadsCount == {2}",
          jobNr, threadsRemaining, directThreadsCount);
      });
      Console.WriteLine("FINISHED");
      Console.ReadLine();
    }
  }
}