Подтвердить что ты не робот

С# Threading - Чтение и хэширование нескольких файлов одновременно, самый простой способ?

Я пытаюсь получить то, что, по моему мнению, является самой простой формой потоковой обработки для работы в моем приложении, но я просто не могу этого сделать.

Что я хочу сделать: у меня есть основная форма со статусной полосой и индикатор выполнения. Мне нужно прочитать что-то между 3 и 99 файлами и добавить их хэши в строку [], которую я хочу добавить в список всех файлов с соответствующими хэшами. Впоследствии я должен сравнить элементы в этом списке с базой данных (которая поступает в текстовые файлы). Как только все это будет сделано, я должен обновить текстовое поле в основной форме, а прогресс - до 33%; в основном я просто не хочу, чтобы основная форма зависала во время обработки.

Файлы, с которыми я работаю, всегда суммируются до 1,2 ГБ (+/- несколько МБ), то есть я должен уметь читать их в байт [] s и обрабатывать их оттуда (я должен вычислить CRC32, MD5 и SHA1 каждого из этих файлов, чтобы они были быстрее, чем чтение всех из них с жесткого диска 3 раза).

Также следует отметить, что некоторые файлы могут быть 1 МБ, а другой - 1 ГБ. Сначала мне захотелось создать 99 потоков для 99 файлов, но это кажется неразумным, я полагаю, было бы лучше повторно использовать потоки небольших файлов, в то время как большие потоки файлов все еще работают. Но это звучит довольно сложно для меня, поэтому я не уверен, что и этот мудрый.

До сих пор я пробовал workThreads и backgroundWorkers, но ни один из них не работает слишком хорошо для меня; по крайней мере, фоновые рабочие работали НЕКОТОРЫЕ из времени, но я даже не могу понять, почему они не будут в других случаях... в любом случае основная форма все еще застыла. Теперь я прочитал о параллельной библиотеке задач в .NET 4.0, но я подумал, что лучше спросить кого-то, кто знает, что он делает, прежде чем тратить больше времени на это.

То, что я хочу сделать, выглядит примерно так (без потоковой передачи):

List<string[]> fileSpecifics = new List<string[]>();

int fileMaxNumber = 42; // something between 3 and 99, depending on file set

for (int i = 1; i <= fileMaxNumber; i++)
{
    string fileName = "C:\\path\\to\\file" + i.ToString("D2") + ".ext"; // file01.ext - file99.ext
    string fileSize = new FileInfo(fileName).Length.ToString();
    byte[] file = File.ReadAllBytes(fileName);
    // hash calculations (using SHA1CryptoServiceProvider() etc., no problems with that so I'll spare you that, return strings)
    file = null; // I didn't yet check if this made any actual difference but I figured it couldn't hurt
    fileSpecifics.Add(new string[] { fileName, fileSize, fileCRC, fileMD5, fileSHA1 });
}

// look for files in text database mentioned above, i.e. first check for "file bundles" with the same amount of files I have here; then compare file sizes, then hashes
// again, no problems with that so I'll spare you that; the database text files are pretty small so parsing them doesn't need to be done in an extra thread.

Кто-нибудь был бы любезен, чтобы указать мне в правильном направлении? Я ищу самый простой способ быстро прочитать и хэш файлы (я считаю, что хеширование занимает некоторое время, в течение которого другие файлы уже могут быть прочитаны) и сохранить вывод в строке [] без замораживания основной формы, ничего более, не менее.

Я благодарен за любой ввод.

РЕДАКТИРОВАТЬ, чтобы уточнить: "работающие фоновые рабочие дни" Я имел в виду, что (для одного и того же набора файлов), возможно, первое и четвертое выполнение моего кода производит правильный вывод, а пользовательский интерфейс размораживается в течение 5 секунд, для второго, третьего и пятого исполнения он замораживает форму (и через 60 секунд я получаю сообщение об ошибке, в котором говорится, что какой-то поток не ответил в течение этого временного кадра), и мне нужно прекратить выполнение через VS.

Спасибо за все ваши предложения и указатели, как вы все правильно догадались, что я совершенно новичок в потоковом режиме, и вам нужно будет ознакомиться с замечательными ссылками, которые вы, ребята, разместили. Тогда я дам эти методы попробовать и обозначить ответ, который помог мне больше всего. Еще раз спасибо!

4b9b3361

Ответ 1

С .NET Framework 4.X

  • Использовать Directory.EnumerateFiles Метод для эффективного/ленивого перечисления файлов
  • Используйте Parallel.For() для делегирования parallelism работать с базой PLINQ или использовать TPL для делегирования отдельной задачи для каждого этапа цикла
  • Используйте шаблон трубопроводов для конвейера следующих этапов: вычисление хэш-кодов, сравнение с шаблоном, обновление пользовательского интерфейса
  • Чтобы избежать зависания пользовательского интерфейса, используйте соответствующие методы: для WPF используйте Dispatcher.BeginInvoke(), для WinForms используйте Invoke(), см. этот ответ SO
  • Учитывая, что у всего этого есть пользовательский интерфейс, может быть полезно добавить некоторую функцию отмены, чтобы отказаться от продолжительной работы, если нужно, взгляните на CreateLinkedTokenSource, который позволяет запускать CancellationToken из "внешней области" Я могу попробовать добавить пример, но стоит сделать это самостоятельно, чтобы вы узнали все это, а не просто скопировали/вставляли → заработали → забыли об этом.

PS: Должен читать - Трубопроводная бумага в MSDN


Спецификация конкретного трубопровода TPL

  • Реализация схемы трубопровода: три этапа: вычисление хэша, совпадение, обновление интерфейса
  • Три задачи, по одному на этап
  • Две блокирующие очереди

//

// 1) CalculateHashesImpl() should store all calculated hashes here
// 2) CompareMatchesImpl() should read input hashes from this queue
// Tuple.Item1 - hash, Typle.Item2 - file path
var calculatedHashes = new BlockingCollection<Tuple<string, string>>();


// 1) CompareMatchesImpl() should store all pattern matching results here
// 2) SyncUiImpl() method should read from this collection and update 
//    UI with available results
var comparedMatches = new BlockingCollection<string>();

var factory = new TaskFactory(TaskCreationOptions.LongRunning,
                              TaskContinuationOptions.None);


var calculateHashesWorker = factory.StartNew(() => CalculateHashesImpl(...));
var comparedMatchesWorker = factory.StartNew(() => CompareMatchesImpl(...));
var syncUiWorker= factory.StartNew(() => SyncUiImpl(...));

Task.WaitAll(calculateHashesWorker, comparedMatchesWorker, syncUiWorker);

CalculateHashesImpl():

private void CalculateHashesImpl(string directoryPath)
{
   foreach (var file in Directory.EnumerateFiles(directoryPath))
   {
       var hash = CalculateHashTODO(file);
       calculatedHashes.Add(new Tuple<string, string>(hash, file.Path));
   }
}

CompareMatchesImpl():

private void CompareMatchesImpl()
{
   foreach (var hashEntry in calculatedHashes.GetConsumingEnumerable())
   {
      // TODO: obviously return type is up to you
      string matchResult = GetMathResultTODO(hashEntry.Item1, hashEntry.Item2);
      comparedMatches.Add(matchResult);
   }
}

SyncUiImpl():

private void UpdateUiImpl()
{
    foreach (var matchResult in comparedMatches.GetConsumingEnumerable())
    {
        // TODO: track progress in UI using UI framework specific features
        // to do not freeze it
    }
}

TODO: рассмотрите возможность использования CancellationToken в качестве параметра для всех вызовов GetConsumingEnumerable(), чтобы вы могли легко остановить выполнение конвейера при необходимости.

Ответ 2

Прежде всего, вы должны использовать более высокий уровень абстракции для решения этой проблемы. У вас есть множество задач для завершения, поэтому используйте абстракцию "задача". Вы должны использовать параллельную библиотеку задач для этого. Пусть TPL справится с вопросом о том, сколько рабочих потоков будет создано - ответ может быть таким же низким, как один, если работа зашифровывается при вводе-выводе.

Если вы хотите сделать свою собственную нить, несколько советов:

  • Никогда не блокируйте поток пользовательского интерфейса. Это то, что замораживает ваше приложение. Придумайте протокол, с помощью которого рабочие потоки могут взаимодействовать с вашим потоком пользовательского интерфейса, который затем ничего не делает, кроме как отвечать на события пользовательского интерфейса. Помните, что методы управления пользовательским интерфейсом, такие как панели завершения задачи, никогда не должны вызываться никаким другим потоком, отличным от потока пользовательского интерфейса.

  • Не создавайте 99 потоков для чтения 99 файлов. Это похоже на получение 99 писем и наем 99 помощников для написания ответов: необычайно дорогое решение простой проблемы. Если ваша работа имеет интенсивность процессора, тогда нет смысла "нанимать" больше потоков, чем у вас есть процессоры для их обслуживания. (Это похоже на наем 99 помощников в офисе, в котором есть только четыре стола. Помощники проводят большую часть своего времени, ожидая стола, чтобы сидеть, а не читать почту.) Если ваша работа носит дисковый характер, то большинство этих потоков идут чтобы простаивать большую часть времени, ожидая диск, что представляет собой еще большую трату ресурсов.

Ответ 3

Во-первых, я надеюсь, что вы используете встроенную библиотеку для вычисления хэшей. Возможно написать свои собственные, но гораздо безопаснее использовать что-то, что было вокруг какое-то время.

Вам может потребоваться создать столько потоков, сколько процессоров, если ваш процесс будет интенсивным. Если он связан с I/O, вы можете уйти с большим количеством потоков.

Я не рекомендую загружать весь файл в память. Ваша хеширующая библиотека должна поддерживать обновление куска за раз. Прочитайте фрагмент в памяти, используйте его для обновления хэшей каждого алгоритма, прочитайте следующий фрагмент и повторите до конца файла. Разделенный подход поможет снизить потребности в программной памяти.

Как предложили другие, загляните в параллельную библиотеку задач, в частности Данные Parallelism. Это может быть так просто:

Parallel.ForEach(fileSpecifics, item => CalculateHashes(item));

Ответ 4

Отметьте поток данных TPL. Вы можете использовать дросселируемый ActionBlock, который будет управлять сложной частью для вас.

Ответ 5

Если я понимаю, что вы хотите выполнить некоторые задачи в фоновом режиме, а не блокировать свой пользовательский интерфейс, тогда пользовательский интерфейс BackgroundWorker будет подходящим выбором. Вы упомянули, что у вас это работает некоторое время, поэтому моя рекомендация будет заключаться в том, чтобы взять то, что у вас было в полуработающем состоянии, и улучшить его, отслеживая неудачи. Если моя догадка правильная, ваш работник бросает исключение, которое, как представляется, не обрабатывается в вашем коде. Необработанные исключения, которые выходят из их содержащих потоков, делают плохие вещи.

Ответ 6

Этот код хэширует один файл (поток) с использованием двух задач - один для чтения, второй для хэширования, для более надежного способа вы должны читать больше фрагментов вперед.

Поскольку пропускная способность процессора намного выше, чем на диске, если вы не используете какой-либо высокоскоростной флеш-накопитель, вы ничего не получаете от хэширования большего количества файлов одновременно.

public void TransformStream(Stream a_stream, long a_length = -1)
{
    Debug.Assert((a_length == -1 || a_length > 0));

    if (a_stream.CanSeek)
    {
        if (a_length > -1)
        {
            if (a_stream.Position + a_length > a_stream.Length)
                throw new IndexOutOfRangeException();
        }

        if (a_stream.Position >= a_stream.Length)
            return;
    }

    System.Collections.Concurrent.ConcurrentQueue<byte[]> queue =
        new System.Collections.Concurrent.ConcurrentQueue<byte[]>();
    System.Threading.AutoResetEvent data_ready = new System.Threading.AutoResetEvent(false);
    System.Threading.AutoResetEvent prepare_data = new System.Threading.AutoResetEvent(false);

    Task reader = Task.Factory.StartNew(() =>
    {
        long total = 0;

        for (; ; )
        {
            byte[] data = new byte[BUFFER_SIZE];
            int readed = a_stream.Read(data, 0, data.Length);

            if ((a_length == -1) && (readed != BUFFER_SIZE))
                data = data.SubArray(0, readed);
            else if ((a_length != -1) && (total + readed >= a_length))
                data = data.SubArray(0, (int)(a_length - total));

            total += data.Length;

            queue.Enqueue(data);
            data_ready.Set();

            if (a_length == -1)
            {
                if (readed != BUFFER_SIZE)
                    break;
            }
            else if (a_length == total)
                break;
            else if (readed != BUFFER_SIZE)
                throw new EndOfStreamException();

            prepare_data.WaitOne();
        }
    });

    Task hasher = Task.Factory.StartNew((obj) =>
    {
        IHash h = (IHash)obj;
        long total = 0;

        for (; ; )
        {
            data_ready.WaitOne();

            byte[] data;
            queue.TryDequeue(out data);

            prepare_data.Set();

            total += data.Length;

            if ((a_length == -1) || (total < a_length))
            {
                h.TransformBytes(data, 0, data.Length);
            }
            else
            {
                int readed = data.Length;
                readed = readed - (int)(total - a_length);
                h.TransformBytes(data, 0, data.Length);
            }

            if (a_length == -1)
            {
                if (data.Length != BUFFER_SIZE)
                    break;
            }
            else if (a_length == total)
                break;
            else if (data.Length != BUFFER_SIZE)
                throw new EndOfStreamException();
        }
    }, this);

    reader.Wait();
    hasher.Wait();
}

Остальная часть кода здесь: http://hashlib.codeplex.com/SourceControl/changeset/view/71730#514336