Подтвердить что ты не робот

Ограничивает ли Parallel.ForEach количество активных потоков?

С учетом этого кода:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

Будет ли все 1000 потоков появляться почти одновременно?

4b9b3361

Ответ 1

Нет, он не запустит 1000 потоков - да, это ограничит количество потоков. Параллельные расширения используют соответствующее количество ядер, исходя из того, сколько у вас физически и сколько уже занято. Он выделяет работу для каждого ядра, а затем использует технику, называемую "кража работы", чтобы позволить каждому потоку эффективно обрабатывать свою собственную очередь и только нужно делать любой дорогостоящий сквозной доступ, когда это действительно необходимо.

Взгляните на Блог группы PFX для получения дополнительной информации о том, как он распределяет работу и все другие темы.

Обратите внимание, что в некоторых случаях вы также можете указать степень parallelism.

Ответ 2

На одном основном компьютере... Parallel.ForEach разделы (куски) коллекции, которые он работает между несколькими потоками, но это число вычисляется на основе алгоритма, который учитывает и, как представляется, постоянно контролирует работу выполняется потоками, которые он выделяет для ForEach. Поэтому, если основная часть ForEach вызывает длительные функции IO-привязки/блокировки, которые оставят поток в ожидании, алгоритм вызовет больше потоков и перераспределит коллекцию между ними. Если потоки завершаются быстро и не блокируются на потоках ввода-вывода, например, например, просто вычисляя некоторые числа, алгоритм будет увеличивать (или даже уменьшать) количество потоков до точки, где алгоритм считает оптимальным для пропускной способности (среднее завершение время каждой итерации).

В основном пул потоков за всеми различными функциями параллельной библиотеки будет выработать оптимальное количество потоков для использования. Количество физических процессорных ядер составляет только часть уравнения. Между количеством ядер и количеством порожденных нитей нет простой взаимосвязи между ними.

Я не считаю документацию вокруг отмены и обработки синхронизирующих потоков очень полезной. Надеюсь, MS может предоставить лучшие примеры в MSDN.

Не забывайте, что код тела должен быть написан для работы на нескольких потоках вместе со всеми обычными соображениями безопасности потока, рамки не абстрагируют этот фактор... пока.

Ответ 3

Это оптимальное количество потоков, основанных на количестве процессоров/ядер. Они не будут сразу появляться.

Ответ 5

Отличный вопрос. В вашем примере уровень распараллеливания довольно низок даже на четырехъядерном процессоре, но с некоторым ожиданием уровень распараллеливания может стать довольно высоким.

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);
        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Теперь посмотрим, что произойдет, когда операция ожидания будет добавлена ​​для имитации HTTP-запроса.

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Я еще не внес изменений, и уровень concurrency/распараллеливания резко изменился. Concurrency может увеличить свой предел с помощью ParallelOptions.MaxDegreeOfParallelism.

// Max concurrency: 43
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

// Max concurrency: 391
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(100000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Я рекомендую установить ParallelOptions.MaxDegreeOfParallelism. Он не обязательно увеличит количество используемых потоков, но это обеспечит вам только запуск нормального количества потоков, что, по-видимому, вызывает у вас беспокойство.

Наконец, чтобы ответить на ваш вопрос, нет, вы не сразу получите все потоки. Используйте Parallel.Invoke, если вы хотите, чтобы вы открывали параллель, например, испытания условий гонки.

// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
    ConcurrentBag<string> monitor = new ConcurrentBag<string>();
    ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(DateTime.UtcNow.Ticks.ToString());
        monitor.TryTake(out string result);
        monitorOut.Add(result);
    });

    var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
    Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}