Подтвердить что ты не робот

Как правильно распределить работу, сильно полагаясь на ввод-вывод

Я создаю консольное приложение, которое должно обрабатывать кучу данных.

В основном приложение захватывает ссылки из БД. Для каждой ссылки проанализируйте содержимое файла и внесите некоторые изменения. Файлы представляют собой файлы HTML, и этот процесс делает тяжелую работу с заменами RegEx (найти ссылки и преобразовать их в ссылки). Результаты затем сохраняются в файловой системе и отправляются во внешнюю систему.

Если я возобновляю процесс, следующим образом:

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
    var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
    var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
    SendToWs(ref, convertedHtml);
}

Моя программа работает правильно, но довольно медленная. Поэтому я хочу распараллелить процесс.

В настоящее время я сделал простую параллелизацию, добавив AsParallel:

var refs = GetReferencesFromDB().AsParallel(); 
refs.ForAll(ref=>
{
    var filePath = GetFilePath(ref); 
    var html = File.ReadAllText(filePath); 
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); 
    SendToWs(ref, convertedHtml);
});

Это простое изменение уменьшает длительность процесса (на 25% меньше времени). Тем не менее, я понимаю, что с распараллеливанием заключается в том, что при параллельном доступе к ресурсам, основанным на I/O, не будет много преимуществ (или, что еще хуже, меньше преимуществ), потому что входы/выходы не будут магически удваиваться.

Вот почему я думаю, что должен изменить свой подход, чтобы не распараллелить весь процесс, а создать зависимые связанные задачи с цепочкой.

I.E., я должен создать поток вроде:

Файл чтения в очередь. Когда закончите, Queue ParseHtml. По завершении очереди Очередь отправляется на WS и записывается локально. Когда закончите, запишите результат.

Однако я не знаю, как реализовать такой взгляд.

Я чувствую, что это закончится набором очередей потребителей/производителей, но я не нашел правильный образец.

И более того, я не уверен, что будут преимущества.

спасибо за советы

[Edit] На самом деле, я идеальный кандидат на использование С# 4.5... если только это было rtm:)

[Edit 2] Еще одна вещь, которая заставляет меня думать, что она неправильно распараллеливается, заключается в том, что в мониторе ресурсов я вижу графики нестабильной работы CPU, сетевого ввода-вывода и ввода-вывода. когда человек высок, другие - от низкого до среднего

4b9b3361

Ответ 1

Вы не используете какие-либо API-интерфейсы асинхронного ввода-вывода в любом из ваших кодов. Все, что вы делаете, связано с процессором, и все ваши операции ввода-вывода будут уничтожать блокирование ресурсов процессора. AsParallel предназначен для задач, связанных с вычислением, если вы хотите воспользоваться преимуществами асинхронного ввода-вывода, вам необходимо использовать API-интерфейсы асинхронной программируемой модели (APM) сегодня в <= v4.0. Это делается путем поиска методов BeginXXX/EndXXX на используемых вами классах ввода-вывода и использования их, когда они доступны.

Прочитайте это сообщение для стартеров: TPL TaskFactory.FromAsync vs Задачи с методами блокировки

Затем вы не хотите использовать AsParallel в этом случае. AsParallel включает потоковое вещание, которое приведет к немедленному планированию новой задачи для каждого элемента, но здесь вам не нужно/не нужно. Вам будет гораздо лучше обслуживать разделение работы с помощью Parallel::ForEach.

Посмотрите, как вы можете использовать это знание для достижения max concurrency в вашем конкретном случае:

var refs = GetReferencesFromDB();

// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
    refs,
    ref =>
{ 
    string filePath = GetFilePath(ref);

    byte[] fileDataBuffer = new byte[1048576];

    // Need to use FileStream API directly so we can enable async I/O
    FileStream sourceFileStream = new FileStream(
                                      filePath, 
                                      FileMode.Open,
                                      FileAccess.Read,
                                      FileShare.Read,
                                      8192,
                                      true);

    // Use FromAsync to read the data from the file
    Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
                                             sourceFileStream.BeginRead
                                             sourceFileStream.EndRead
                                             fileDataBuffer,
                                             fileDataBuffer.Length,
                                             null);

    // Add a continuation that will fire when the async read is completed
    readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
    {
        int soureFileStreamBytesRead;

        try
        {
            // Determine exactly how many bytes were read 
            // NOTE: this will propagate any potential exception that may have occurred in EndRead
            sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
        }
        finally
        {
            // Always clean up the source stream
            sourceFileStream.Close();
            sourceFileStream = null;
        }

        // This is here to make sure you don't end up trying to read files larger than this sample code can handle
        if(sourceFileStreamBytesRead == fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
        }

        // Convert the file data to a string
        string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);

        // Parse the HTML
        string convertedHtml = ParseHtml(html);

        // This is here to make sure you don't end up trying to write files larger than this sample code can handle
        if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
        }

        // Convert the file data back to bytes for writing
        Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);

        // Need to use FileStream API directly so we can enable async I/O
        FileStream destinationFileStream = new FileStream(
                                               destinationFilePath,
                                               FileMode.OpenOrCreate,
                                               FileAccess.Write,
                                               FileShare.None,
                                               8192,
                                               true);

        // Use FromAsync to read the data from the file
        Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
                                                  destinationFileStream.BeginWrite,
                                                  destinationFileStream.EndWrite,
                                                  fileDataBuffer,
                                                  0,
                                                  fileDataBuffer.Length,
                                                  null);

        // Add a continuation that will fire when the async write is completed
        destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
        {
            try
            {
                // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
                destinationFileStreamWriteAntecedent.Wait();
            }
            finally
            {
                // Always close the destination file stream
                destinationFileStream.Close();
                destinationFileStream = null;
            }
        },
        TaskContinuationOptions.AttachedToParent);

        // Send to external system **concurrent** to writing to destination file system above
        SendToWs(ref, convertedHtml);
    },
    TaskContinuationOptions.AttachedToParent);
});

Теперь, здесь несколько примечаний:

  • Это пример кода, поэтому я использую буфер 1 МБ для чтения/записи файлов. Это чрезмерно для файлов HTML и расточительство системных ресурсов. Вы можете либо опустить его в соответствии с вашими максимальными потребностями, либо реализовать цепные чтения/записи в StringBuilder, который является упражнением, которое я оставляю вам, так как я буду писать более 500 строк кода, чтобы делать асинхронные чтения/записи.: P
  • Вы заметите, что в продолжении для задач чтения/записи у меня есть TaskContinuationOptions.AttachedToParent. Это очень важно, так как это предотвратит рабочий поток, который Parallel::ForEach запускает работу с момента завершения, до тех пор, пока все базовые вызовы async не будут завершены. Если бы этого не было, вы могли бы начать работу по всем 5000 элементам одновременно, что загрязняло бы подсистему TPL тысячами запланированных задач и вообще не масштабировалось.
  • Я вызываю SendToWs одновременно с записью файла в общий ресурс файла. Я не знаю, что является основой реализации SendToWs, но это тоже звучит как хороший кандидат для создания async. Прямо сейчас он принял на себя чистую вычислительную работу и, как таковой, собирается записывать поток ЦП при выполнении. Я оставляю это как упражнение для вас, чтобы выяснить, как лучше всего использовать то, что я показал вам, чтобы улучшить пропускную способность там.
  • Это все напечатанная свободная форма, и мой мозг был единственным компилятором здесь, а синтаксис синтаксиса SO - это все, что я использовал, чтобы убедиться, что синтаксис хорош. Поэтому, пожалуйста, простите любые ошибки синтаксиса и дайте мне знать, если я слишком сильно напортачил, что вы не можете сделать головы или хвосты, и я продолжу.

Ответ 2

Хорошей новостью является то, что ваша логика может быть легко разделена на шаги, которые входят в конвейер производителя-потребителя.

  • Шаг 1: Прочитать файл
  • Шаг 2: Файл анализа
  • Шаг 3: Запись файла
  • Шаг 4: SendToWs

Если вы используете .NET 4.0, вы можете использовать структуру данных BlockingCollection в качестве основы для каждой очереди производителей и потребителей. Основной поток будет помещать каждый рабочий элемент в очередь шагов 1, где он будет подбираться и обрабатываться, а затем перенаправляется на очередь шага 2 и т.д. И т.д.

Если вы готовы перейти к Async CTP, то вы можете воспользоваться новым TPL Dataflow для этого. Существует структура данных BufferBlock<T>, среди прочих, которая ведет себя аналогично BlockingCollection и хорошо интегрируется с новыми ключевыми словами async и await.

Поскольку ваш алгоритм привязан к IO, стратегии производителя-потребителя могут не дать вам повышение производительности, которое вы ищете, но по крайней мере у вас будет очень элегантное решение, которое будет хорошо масштабироваться, если вы сможете увеличить пропускную способность ввода-вывода. Я боюсь, что шаги 1 и 3 станут узкими местами, и трубопровод не будет хорошо балансировать, но стоит поэкспериментировать.

Ответ 3

Просто предложение, но вы заглянули в образец Consumer/Producer? Определенное количество потоков будет читать ваши файлы на диске и кормить контент в очередь. Тогда другой набор потоков, известный как потребители, "поглотит" очередь в качестве ее заполнения. http://zone.ni.com/devzone/cda/tut/p/id/3023

Ответ 4

Ваш лучший выбор в этом сценарии, безусловно, является моделью производителей-потребителей. Один поток, чтобы вытащить данные и кучу рабочих для его обработки. Там нет простого способа ввода/вывода, поэтому вы можете просто сосредоточиться на оптимизации самого вычисления.

Теперь я попытаюсь набросать модель:

// producer thread
var refs = GetReferencesFromDB(); // ~5000 Datarow returned

foreach(var ref in refs)
{
    lock(queue)
    {   
       queue.Enqueue(ref);
       event.Set();
    }

    // if the queue is limited, test if the queue is full and wait.
}

// consumer threads
while(true)
{
    value = null;
    lock(queue)
    {
       if(queue.Count > 0)
       {
           value = queue.Dequeue();
       }
    }        

    if(value != null) 
       // process value
    else        
       event.WaitOne(); // event to signal that an item was placed in the queue.           
}

Более подробную информацию о производителе/​​потребителе вы можете найти в части 4 Threading in С#: http://www.albahari.com/threading/part4.aspx

Ответ 5

Я думаю, что ваш подход к разделению списка файлов и обработке каждого файла в одной партии в порядке. Я чувствую, что вы можете получить больше прироста производительности, если играете со степенью parallelism. См.: var refs = GetReferencesFromDB().AsParallel().WithDegreeOfParallelism(16); это приведет к одновременной обработке 16 файлов. В настоящее время вы обрабатываете, вероятно, 2 или 4 файла в зависимости от количества ядер, которые у вас есть. Это эффективно только тогда, когда у вас есть только вычисления без ввода-вывода. Для интенсивной работы IO настройка может привести к невероятным улучшениям производительности, сокращая время простоя процессора.

Если вы собираетесь делиться и присоединяться к задачам обратно, используя пример производителя-потребителя, посмотрите на этот образец: Используя Parallel Linq Extensions для объединения двух последовательностей, как можно получить самые быстрые результаты в первую очередь?