Подтвердить что ты не робот

Почему Parallel.ForEach намного быстрее, чем AsParallel(). ForAll(), хотя MSDN предлагает иное?

Я провел некоторое исследование, чтобы понять, как мы можем создать многопоточное приложение, которое запускается через дерево.

Чтобы выяснить, как это можно реализовать наилучшим образом, я создал тестовое приложение, которое запускается через мой диск C:\и открывает все каталоги.

class Program
{
    static void Main(string[] args)
    {
        //var startDirectory = @"C:\The folder\RecursiveFolder";
        var startDirectory = @"C:\";

        var w = Stopwatch.StartNew();

        ThisIsARecursiveFunction(startDirectory);

        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);

        Console.ReadKey();
    }

    public static void ThisIsARecursiveFunction(String currentDirectory)
    {
        var lastBit = Path.GetFileName(currentDirectory);
        var depth = currentDirectory.Count(t => t == '\\');
        //Console.WriteLine(depth + ": " + currentDirectory);

        try
        {
            var children = Directory.GetDirectories(currentDirectory);

            //Edit this mode to switch what way of parallelization it should use
            int mode = 3;

            switch (mode)
            {
                case 1:
                    foreach (var child in children)
                    {
                        ThisIsARecursiveFunction(child);
                    }
                    break;
                case 2:
                    children.AsParallel().ForAll(t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                case 3:
                    Parallel.ForEach(children, t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                default:
                    break;
            }

        }
        catch (Exception eee)
        {
            //Exception might occur for directories that can't be accessed.
        }
    }
}

Однако я столкнулся с тем, что при выполнении этого в режиме 3 (Parallel.ForEach) код завершается примерно за 2,5 секунды (да, у меня есть SSD;)). Выполнение кода без распараллеливания завершается примерно за 8 секунд. Запуск кода в режиме 2 (AsParalle.ForAll()) занимает почти бесконечное время.

При проверке в проводнике процессов я также сталкиваюсь с несколькими странными фактами:

Mode1 (No Parallelization):
Cpu:     ~25%
Threads: 3
Time to complete: ~8 seconds

Mode2 (AsParallel().ForAll()):
Cpu:     ~0%
Threads: Increasing by one per second (I find this strange since it seems to be waiting on the other threads to complete or a second timeout.)
Time to complete: 1 second per node so about 3 days???

Mode3 (Parallel.ForEach()):
Cpu:     100%
Threads: At most 29-30
Time to complete: ~2.5 seconds

Что я нахожу особенно странным, так это то, что Parallel.ForEach, похоже, игнорирует любые родительские потоки/задачи, которые все еще выполняются, пока AsParallel(). ForAll(), кажется, ожидает завершения предыдущей задачи (что не скоро все родительские задачи все еще ожидают завершения своих дочерних задач).

Кроме того, я прочитал в MSDN: "Предпочитаю ForAll для ForEach, когда это возможно"

Источник: http://msdn.microsoft.com/en-us/library/dd997403(v=vs.110).aspx

Кто-нибудь знает, почему это может быть?

Изменить 1:

По просьбе Мэтью Уотсона я сначала загрузил дерево в память, а затем прошел через него. Теперь загрузка дерева выполняется последовательно.

Результаты, однако, одинаковы. Unparallelized и Parallel.ForEach теперь завершают все дерево примерно за 0,05 секунды, в то время как AsParallel(). ForAll по-прежнему работает только на 1 шаг в секунду.

Код:

class Program
{
    private static DirWithSubDirs RootDir;

    static void Main(string[] args)
    {
        //var startDirectory = @"C:\The folder\RecursiveFolder";
        var startDirectory = @"C:\";

        Console.WriteLine("Loading file system into memory...");
        RootDir = new DirWithSubDirs(startDirectory);
        Console.WriteLine("Done");


        var w = Stopwatch.StartNew();

        ThisIsARecursiveFunctionInMemory(RootDir);

        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);

        Console.ReadKey();
    }        

    public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
    {
        var depth = currentDirectory.Path.Count(t => t == '\\');
        Console.WriteLine(depth + ": " + currentDirectory.Path);

        var children = currentDirectory.SubDirs;

        //Edit this mode to switch what way of parallelization it should use
        int mode = 2;

        switch (mode)
        {
            case 1:
                foreach (var child in children)
                {
                    ThisIsARecursiveFunctionInMemory(child);
                }
                break;
            case 2:
                children.AsParallel().ForAll(t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            case 3:
                Parallel.ForEach(children, t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            default:
                break;
        }
    }
}

class DirWithSubDirs
{
    public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();
    public String Path { get; private set; }

    public DirWithSubDirs(String path)
    {
        this.Path = path;
        try
        {
            SubDirs = Directory.GetDirectories(path).Select(t => new DirWithSubDirs(t)).ToList();
        }
        catch (Exception eee)
        {
            //Ignore directories that can't be accessed
        }
    }
}

Изменить 2:

Прочитав обновление комментария Мэтью, я попытался добавить в программу следующий код:

ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);

Это, однако, не меняет способ преобразования AsParallel. Тем не менее, первые 8 шагов выполняются за мгновение до замедления до 1 шага в секунду.

(Дополнительное примечание: в настоящее время я игнорирую исключения, возникающие, когда я не могу получить доступ к каталогу с помощью блока Try Catch вокруг Directory.GetDirectories())

Изменить 3:

Также меня больше всего интересует разница между Parallel.ForEach и AsParallel.ForAll, потому что для меня просто странно, что по какой-то причине второй создает один поток для каждой рекурсии, в то время как первый обрабатывает все примерно в 30 потоках. Максимум. (А также, почему MSDN предлагает использовать AsParallel, несмотря на то, что он создает столько потоков с таймаутом ~ 1 секунда)

Изменить 4:

Еще одна странная вещь, которую я узнал: Когда я пытаюсь установить значение MinThreads в пуле потоков выше 1023, оно, похоже, игнорирует значение и масштабируется примерно до 8 или 16: ThreadPool.SetMinThreads(1023, 16);

Тем не менее, когда я использую 1023, он делает первые 1023 элемента очень быстро, а затем возвращается к медленному темпу, который я испытывал все время.

Примечание. Также буквально создано более 1000 потоков (по сравнению с 30 для всего Parallel.ForEach).

Означает ли это, что Parallel.ForEach намного умнее в обработке задач?

Еще немного информации: этот код печатается дважды 8 - 8, когда вы устанавливаете значение выше 1023: (Когда вы устанавливаете значение 1023 или ниже, он печатает правильное значение)

        int threadsMin;
        int completionMin;
        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Cur min threads: " + threadsMin + " and the other thing: " + completionMin);

        ThreadPool.SetMinThreads(1023, 16);
        ThreadPool.SetMaxThreads(1023, 16);

        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Now min threads: " + threadsMin + " and the other thing: " + completionMin);

Изменить 5:

По запросу Дина я создал еще один случай для ручного создания задач:

case 4:
    var taskList = new List<Task>();
    foreach (var todo in children)
    {
        var itemTodo = todo;
        taskList.Add(Task.Run(() => ThisIsARecursiveFunctionInMemory(itemTodo)));
    }
    Task.WaitAll(taskList.ToArray());
    break;

Это также быстро, как цикл Parallel.ForEach(). Поэтому у нас до сих пор нет ответа на вопрос, почему AsParallel(). ForAll() намного медленнее.

4b9b3361

Ответ 1

Эта проблема довольно отлаживаемая, необычная роскошь, когда у вас проблемы с потоками. Основным инструментом здесь является окно Debug > Windows > Threads debugger. Показывает активные потоки и дает вам быстрый взгляд на трассировку стека. Вы легко поймете, что, как только он станет медленным, вы будете иметь десятки активных потоков, которые все застряли. Их трассировка стека выглядит одинаково:

    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext) + 0x16 bytes  
    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout) + 0x7 bytes 
    mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x182 bytes    
    mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x93 bytes   
    mscorlib.dll!System.Threading.Tasks.Task.InternalRunSynchronously(System.Threading.Tasks.TaskScheduler scheduler, bool waitForCompletion) + 0xba bytes  
    mscorlib.dll!System.Threading.Tasks.Task.RunSynchronously(System.Threading.Tasks.TaskScheduler scheduler) + 0x13 bytes  
    System.Core.dll!System.Linq.Parallel.SpoolingTask.SpoolForAll<ConsoleApplication1.DirWithSubDirs,int>(System.Linq.Parallel.QueryTaskGroupState groupState, System.Linq.Parallel.PartitionedStream<ConsoleApplication1.DirWithSubDirs,int> partitions, System.Threading.Tasks.TaskScheduler taskScheduler) Line 172  C#
// etc..

Всякий раз, когда вы видите что-то подобное, вы должны немедленно подумать о проблемах пожарного шланга. Вероятно, третья самая распространенная ошибка с потоками, после гонок и тупиков.

Что вы можете объяснить, теперь, когда вы знаете причину, проблема с кодом заключается в том, что каждый поток, который завершает, добавляет N больше потоков. Где N - среднее количество подкаталогов в каталоге. По сути, число потоков растет экспоненциально, что всегда плохо. Он останется только под контролем, если N = 1, что, конечно же, никогда не происходит на типичном диске.

Остерегайтесь, что, как и почти любая проблема с потоками, это неправильное поведение имеет тенденцию плохо повторяться. SSD в вашей машине имеет тенденцию скрывать его. Так же как и операционная система в вашей машине, программа может быстро завершить работу и устранить проблему во второй раз, когда вы ее запустите. Поскольку вы сейчас будете читать из кеша файловой системы, а не из диска, очень быстро. Трюки с ThreadPool.SetMinThreads() также скрывает его, но он не может его исправить. Он никогда не исправляет никаких проблем, он только скрывает их. Поскольку независимо от того, что происходит, экспоненциальное число всегда будет подавлять заданное минимальное количество потоков. Вы можете только надеяться, что он завершит завершение итерации диска до того, как это произойдет. Нежелательная надежда для пользователя с большим диском.

Различие между ParallelEnumerable.ForAll() и Parallel.ForEach() теперь, возможно, также легко объясняется. Вы можете сказать из трассировки стека, что ForAll() делает что-то непослушное, метод RunSynchronously() блокирует до тех пор, пока все потоки не будут завершены. Блокировка - это то, что потоки threadpool не должны делать, это замаскирует пул потоков и не позволит ему планировать процессор для другого задания. И эффект, который вы наблюдаете, пул потоков быстро перегружен потоками, которые ждут завершения N других потоков. Что не происходит, они ждут в пуле и не получают запланированного, потому что их уже так много.

Это сценарий взаимоблокировки, довольно распространенный, но диспетчер threadpool имеет обходное решение для него. Он отслеживает активные потоки потоков и шаги, когда они не завершаются своевременно. Затем он позволяет запустить дополнительный поток, что больше, чем минимальное значение, заданное SetMinThreads(). Но не более того максимальный набор SetMaxThreads(), имеющий слишком много активных потоков tp, является рискованным и, вероятно, вызывает OOM. Это устраняет тупик, он получает один из вызовов ForAll() для завершения. Но это происходит очень медленно, поток threadpool делает это только два раза в секунду. У вас не будет терпения, прежде чем он настигнет.

Параллельный .ForEach() не имеет этой проблемы, он не блокируется, поэтому не воспламеняется пул.

Кажется, это решение, но имейте в виду, что ваша программа по-прежнему запускает память вашего компьютера, добавляя в пул все больше ожидающих потоков tp. Это также может привести к сбою вашей программы, это просто не так, потому что у вас много памяти, и threadpool не использует ее много для отслеживания запроса. Некоторые программисты, тем не менее, выполняют это также.

Решение очень простое, просто не используйте потоки. Это вредно, нет concurrency, если у вас только один диск. И это не нравится, когда управляется несколькими потоками. Особенно плохо на приводе шпинделя, голова стремится очень, очень медленно. SSD делают это намного лучше, но тем не менее требуется 50 минут на 50 секунд, накладные расходы, которые вам просто не нужны или не нужны. Идеальное количество потоков для доступа к диску, который вы не можете ожидать в кэшировании, всегда один.

Ответ 2

Первое, что нужно отметить, это то, что вы пытаетесь выполнить параллельную операцию с привязкой к IO, что значительно исказит тайминги.

Второе, что нужно отметить, - это характер параллельных задач: вы рекурсивно спускаетесь к дереву каталогов. Если вы создаете несколько потоков для этого, каждый поток, вероятно, будет одновременно обращаться к другой части диска - что приведет к тому, что голова чтения диска будет прыгать повсюду и значительно замедлить работу.

Попробуйте изменить тест, чтобы создать дерево в памяти, и получить доступ к ним с несколькими потоками. Затем вы сможете правильно сравнить тайминги, если результаты не будут искажены вне всякой полезности.

Кроме того, вы можете создавать большое количество потоков, и они будут (по умолчанию) быть потоками threadpool. Наличие большого количества потоков фактически замедлит работу, когда они превысят количество процессорных ядер.

Также обратите внимание, что когда вы превысите минимальные потоки пула потоков (определенные ThreadPool.GetMinThreads()), задержка вводится диспетчером пула потоков между каждым новым созданием потока threadpool. (Я думаю, что это около 0,5 с за новый поток).

Кроме того, если число потоков превышает значение, возвращаемое ThreadPool.GetMaxThreads(), создающий поток будет блокироваться до выхода из одного из других потоков. Я думаю, что это, вероятно, произойдет.

Вы можете проверить эту гипотезу, вызвав ThreadPool.SetMaxThreads() и ThreadPool.SetMinThreads(), чтобы увеличить эти значения и посмотреть, не имеет ли значения.

(Наконец, обратите внимание, что если вы действительно пытаетесь рекурсивно спуститься с C:\, вы почти наверняка получите исключение IO, когда оно достигнет папки с защищенной ОС.)

ПРИМЕЧАНИЕ. Установите потоки потоков max/min, как это:

ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);

Follow Up

Я попробовал свой тестовый код с подсчетом потока threadpool, как описано выше, со следующими результатами (не выполняющимися на всем моем диске C: \, но на меньшем подмножестве):

  • Режим 1 занял 06,5 секунды.
  • Режим 2 занял 15,7 секунды.
  • Режим 3 занял 16,4 секунды.

Это соответствует моим ожиданиям; добавление загрузки потоков для этого фактически делает его медленнее, чем однопоточное, а два параллельных подхода принимают примерно одно и то же время.


В случае, если кто-то еще хочет исследовать это, вот какой-то определительный тестовый код (код OP не воспроизводится, потому что мы не знаем его структуру каталогов).

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

namespace Demo
{
    internal class Program
    {
        private static DirWithSubDirs RootDir;

        private static void Main()
        {
            Console.WriteLine("Loading file system into memory...");
            RootDir = new DirWithSubDirs("Root", 4, 4);
            Console.WriteLine("Done");

            //ThreadPool.SetMinThreads(4000, 16);
            //ThreadPool.SetMaxThreads(4000, 16);

            var w = Stopwatch.StartNew();
            ThisIsARecursiveFunctionInMemory(RootDir);

            Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);
            Console.ReadKey();
        }

        public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
        {
            var depth = currentDirectory.Path.Count(t => t == '\\');
            Console.WriteLine(depth + ": " + currentDirectory.Path);

            var children = currentDirectory.SubDirs;

            //Edit this mode to switch what way of parallelization it should use
            int mode = 3;

            switch (mode)
            {
                case 1:
                    foreach (var child in children)
                    {
                        ThisIsARecursiveFunctionInMemory(child);
                    }
                    break;

                case 2:
                    children.AsParallel().ForAll(t =>
                    {
                        ThisIsARecursiveFunctionInMemory(t);
                    });
                    break;

                case 3:
                    Parallel.ForEach(children, t =>
                    {
                        ThisIsARecursiveFunctionInMemory(t);
                    });
                    break;

                default:
                    break;
            }
        }
    }

    internal class DirWithSubDirs
    {
        public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();

        public String Path { get; private set; }

        public DirWithSubDirs(String path, int width, int depth)
        {
            this.Path = path;

            if (depth > 0)
                for (int i = 0; i < width; ++i)
                    SubDirs.Add(new DirWithSubDirs(path + "\\" + i, width, depth - 1));
        }
    }
}

Ответ 3

Методы Parallel.For и .ForEach реализованы внутри как эквивалент выполнения итераций в задачах, например, что цикл, как:

Parallel.For(0, N, i => 
{ 
  DoWork(i); 
});

эквивалентно:

var tasks = new List<Task>(N); 
for(int i=0; i<N; i++) 
{ 
tasks.Add(Task.Factory.StartNew(state => DoWork((int)state), i)); 
} 
Task.WaitAll(tasks.ToArray());

И с точки зрения каждой итерации, потенциально работающей параллельно с любой другой итерацией, это нормальная ментальная модель, но в реальности этого не происходит. Параллельно, на самом деле, не обязательно использовать одну задачу на одну итерацию, поскольку это значительно больше накладных расходов, чем необходимо. Parallel.ForEach пытается использовать минимальное количество задач, необходимое для максимально быстрого завершения цикла. Он раскручивает задачи, когда потоки становятся доступными для обработки этих задач, и каждая из этих задач участвует в схеме управления (я думаю, это называется чанкинг): задача запрашивает выполнение нескольких итераций, получает их, а затем обрабатывает работающие и затем возвращается к большему. Размеры чанков зависят от количества участвующих задач, нагрузки на машину и т.д.

PLINQ. AsParallel() имеет другую реализацию, но он все еще может аналогичным образом извлекать несколько итераций во временное хранилище, выполнять вычисления в потоке (но не в виде задачи) и помещать результаты запроса в небольшой буфер. (Вы получаете что-то, основанное на ParallelQuery, а затем и другие .Wh независимо() функции связываются с альтернативным набором методов расширения, которые обеспечивают параллельные реализации).

Теперь, когда у нас есть небольшое представление о том, как работают эти два механизма, я постараюсь дать ответ на ваш оригинальный вопрос:

Так почему же .AsParallel() медленнее, чем Parallel.ForEach? Причина кроется в следующем. Задачи (или их эквивалентная реализация здесь) блокируют НЕ вызовы, подобные вводу-выводу. Они ждут и освобождают процессор, чтобы заняться чем-то другим. Но (цитируя краткую книгу по С#): "PLINQ не может выполнять работу, связанную с вводом/выводом, не блокируя потоки". Звонки синхронные. Они были написаны с намерением увеличить степень параллелизма, если (и ТОЛЬКО если) вы выполняете такие вещи, как загрузка веб-страниц для каждой задачи, которые не нагружают процессорное время.

И причина, по которой ваши вызовы функций в точности аналогичны вызовам, связанным с вводом/выводом, заключается в следующем: один из ваших потоков (назовем его T) блокирует и ничего не делает, пока все его дочерние потоки не завершатся, что может быть медленный процесс здесь. Сама T не сильно нагружает процессор, пока ждет разблокировки детей, она ничего не делает, только ждет. Следовательно, он идентичен типичному вызову функции, связанной с вводом/выводом.

Ответ 4

На основе принятого ответа на Как работает AsParallel?

.AsParallel.ForAll() возвращается к IEnumerable перед вызовом .ForAll()

поэтому он создает 1 новый поток + N рекурсивных вызовов (каждый из которых генерирует новый поток).