Параллельный GZip Декомпрессия файлов журнала - настройка MaxDegreeOfParallelism для максимальной пропускной способности - программирование
Подтвердить что ты не робот

Параллельный GZip Декомпрессия файлов журнала - настройка MaxDegreeOfParallelism для максимальной пропускной способности

У нас есть до 30 ГБ файлов журнала GZipped в день. Каждый файл содержит 100 000 строк и составляет от 6 до 8 МБ при сжатии. Упрощенный код, в котором логическая синтаксическая разборка была удалена, использует цикл Parallel.ForEach.

Максимальное количество обработанных пиков в MaxDegreeOfParallelism 8 на двух-NUMA node, 32 логических процессорных ящиках (Intel Xeon E7-2820 @2 ГГц):

using System;

using System.Collections.Concurrent;

using System.Linq;
using System.IO;
using System.IO.Compression;

using System.Threading.Tasks;

namespace ParallelLineCount
{
    public class ScriptMain
    {
        static void Main(String[] args)
        {
            int    maxMaxDOP      = (args.Length > 0) ? Convert.ToInt16(args[0]) : 2;
            string fileLocation   = (args.Length > 1) ? args[1] : "C:\\Temp\\SomeFiles" ;
            string filePattern    = (args.Length > 1) ? args[2] : "*2012-10-30.*.gz";
            string fileNamePrefix = (args.Length > 1) ? args[3] : "LineCounts";

            Console.WriteLine("Start:                 {0}", DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
            Console.WriteLine("Processing file(s):    {0}", filePattern);
            Console.WriteLine("Max MaxDOP to be used: {0}", maxMaxDOP.ToString());
            Console.WriteLine("");

            Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");

            for (int maxDOP = 1; maxDOP <= maxMaxDOP; maxDOP++)
            {

                // Construct ConcurrentStacks for resulting strings and counters
                ConcurrentStack<Int64> TotalLines = new ConcurrentStack<Int64>();
                ConcurrentStack<Int64> TotalSomeBookLines = new ConcurrentStack<Int64>();
                ConcurrentStack<Int64> TotalLength = new ConcurrentStack<Int64>();
                ConcurrentStack<int>   TotalFiles = new ConcurrentStack<int>();

                DateTime FullStartTime = DateTime.Now;

                string[] files = System.IO.Directory.GetFiles(fileLocation, filePattern);

                var options = new ParallelOptions() { MaxDegreeOfParallelism = maxDOP };

                //  Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
                Parallel.ForEach(files, options, currentFile =>
                    {
                        string filename = System.IO.Path.GetFileName(currentFile);
                        DateTime fileStartTime = DateTime.Now;

                        using (FileStream inFile = File.Open(fileLocation + "\\" + filename, FileMode.Open))
                        {
                            Int64 lines = 0, someBookLines = 0, length = 0;
                            String line = "";

                            using (var reader = new StreamReader(new GZipStream(inFile, CompressionMode.Decompress)))
                            {
                                while (!reader.EndOfStream)
                                {
                                    line = reader.ReadLine();
                                    lines++; // total lines
                                    length += line.Length;  // total line length

                                    if (line.Contains("book")) someBookLines++; // some special lines that need to be parsed later
                                }

                                TotalLines.Push(lines); TotalSomeBookLines.Push(someBookLines); TotalLength.Push(length);
                                TotalFiles.Push(1); // silly way to count processed files :)
                            }
                        }
                    }
                );

                TimeSpan runningTime = DateTime.Now - FullStartTime;

                // Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");
                Console.WriteLine("{0},{1},{2},{3},{4},{5},{6},{7}",
                    maxDOP.ToString(),
                    TotalFiles.Sum().ToString(),
                    Convert.ToInt32(runningTime.TotalMilliseconds).ToString(),
                    TotalLength.Sum().ToString(),
                    TotalLines.Sum(),
                    TotalSomeBookLines.Sum().ToString(),
                    Convert.ToInt64(TotalLines.Sum() / runningTime.TotalMilliseconds).ToString(),
                    Convert.ToInt64(TotalLength.Sum() / runningTime.TotalMilliseconds).ToString());

            }
            Console.WriteLine();
            Console.WriteLine("Finish:                " + DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
        }
    }
}

Здесь сводка результатов с явным пиком при MaxDegreeOfParallelism = 8:

enter image description here

Загрузка процессора (показанная здесь в совокупности, большая часть нагрузки была на одном NUMA node, даже если DOP находилась в диапазоне от 20 до 30):

enter image description here

Единственный способ, с помощью которого я могу перегрузить нагрузку на процессор, составляет 95%, - это разбить файлы на 4 разных папки и выполнить ту же команду 4 раза, каждый из которых нацелен на подмножество всех файлов.

Может ли кто-нибудь найти узкое место?

4b9b3361

Ответ 1

Вероятно, одна проблема - это небольшой размер буфера, используемый конструктором FileStream по умолчанию. Я предлагаю вам использовать больший входной буфер. Например:

using (FileStream infile = new FileStream(
    name, FileMode.Open, FileAccess.Read, FileShare.None, 65536))

Размер буфера по умолчанию - 4 килобайта, в котором поток вызывает много вызовов подсистемы ввода-вывода для заполнения своего буфера. Буфер 64K означает, что вы будете делать эти вызовы гораздо реже.

Я обнаружил, что размер буфера между 32 КБ и 256 КБ дает наилучшую производительность, а 64К - это "сладкое пятно", когда я некоторое время тестировал некоторое тестирование. Размер буфера, превышающий 256 КБ, фактически начинает снижать производительность.

Кроме того, хотя это вряд ли будет иметь большое влияние на производительность, вы, вероятно, должны заменить те экземпляры ConcurrentStack на 64-битные целые числа и использовать Interlocked.Add или Interlocked.Increment для их обновления. Это упрощает ваш код и устраняет необходимость управления коллекциями.

Update:

Перечитав описание проблемы, я был поражен этим утверждением:

Единственный способ, которым я нашел, чтобы увеличить загрузку процессора на 95%, - это разделить файлы в 4 разных папках и выполнить ту же команду 4 раз, каждый из которых нацелен на подмножество всех файлов.

Это, для меня, указывает на узкое место при открытии файлов. Как будто ОС использует взаимную блокировку исключения в каталоге. И даже если все данные находятся в кеше и там не требуется физический ввод-вывод, процессам по-прежнему приходится ждать этой блокировки. Также возможно, что файловая система записывает на диск. Помните, что он должен обновлять время последнего доступа для файла, когда он открывается.

Если I/O действительно является узким местом, вы можете подумать о том, что есть единственный поток, который ничего не делает, кроме загрузки файлов и вставляет их в структуру BlockingCollection или аналогичную структуру, так что потоки обработки не должны конкурировать с друг друга для блокировки в каталоге. Ваша заявка становится выражением производителя/потребителя с одним производителем и потребителями N.

Ответ 2

Причиной этого обычно является то, что потоки слишком сильно синхронизируются.

Ищете синхронизацию в вашем коде, я вижу сильную синхронизацию в коллекциях. Ваши потоки нажимают линии по отдельности. Это означает, что каждая строка берет в лучшем случае блокированную операцию, и в худшем случае блокировка режима ядра ждет. Переблокированные операции будут сильно зависеть от того, что все потоки будут проходить, чтобы получить свою текущую строку в коллекции. Все они пытаются обновить те же места в памяти. Это вызывает pinging строки строки.

Измените это, чтобы нажимать строки в больших кусках. Настройте линейные массивы по 100 строк или более. Чем больше, тем лучше.

Другими словами, сначала собирайте результаты в локально-потоковой коллекции и только редко объединяйтесь в глобальные результаты.

Возможно, вы даже захотите избавиться от ручного ввода данных. Это то, для чего производится PLINQ: Поток данных одновременно. PLINQ абстрактно удаляет все совпадающие манипуляции с коллекциями.

Ответ 3

Я не думаю, что вам поможет параллелизация чтения с диска. Фактически, это может серьезно повлиять на вашу производительность, создавая конкуренцию при чтении из нескольких областей хранения в одно и то же время.

Я бы реструктурировал программу, чтобы сначала выполнить однопотоковое чтение необработанных файлов в поток памяти byte []. Затем выполните Parallel.ForEach() для каждого потока или буфера для распаковки и подсчета строк.

Вы берете начальную запись ввода-вывода IO, но пусть OS/аппаратное обеспечение оптимизирует, надеюсь, в основном последовательные чтения, затем распаковывает и анализирует в памяти.

Имейте в виду, что такие операции, как decprless, Encoding.UTF8.ToString(), String.Split() и т.д. будут использовать большие объемы памяти, поэтому очистите ссылки на старые хранилища и удалите их, поскольку они вам больше не нужны.

Я был бы удивлен, если вы не можете заставить машину генерировать серьезные отходы, попадающие таким образом.

Надеюсь, что это поможет.

Ответ 4

Проблема, я думаю, в том, что вы используете блокирующий ввод-вывод, поэтому ваши потоки не могут полностью использовать преимущества parallelism.

Если я правильно понимаю ваш алгоритм (извините, я больше парень из С++), это то, что вы делаете в каждом потоке (псевдокод):

while (there is data in the file)
    read data
    gunzip data

Вместо этого лучший подход будет примерно таким:

N = 0
read data block N
while (there is data in the file)
    asyncRead data block N+1
    gunzip data block N
    N = N + 1
gunzip data block N

Вызов asyncRead не блокируется, поэтому в основном у вас есть декодирование блока N, происходящее одновременно с чтением блока N + 1, поэтому к тому моменту, когда вы закончите блок декодирования N, у вас может быть блок N + 1 готов (или близок к готовности, если I/O работает медленнее декодирования).

Тогда это просто вопрос нахождения размера блока, который дает вам лучшую пропускную способность.

Удачи.