Я что-то делаю неправильно или не удается извлечь файл zip параллельно?

Я создал это, чтобы проверить параллельный экстракт:

    public static async Task ExtractToDirectoryAsync(this FileInfo file, DirectoryInfo folder)

        ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
            var path = Path.Combine(folder.FullName, entry.FullName);


        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

        using (var archive = ZipFile.OpenRead(file.FullName))
            foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
            await block.Completion;


и unit test для тестирования:

    public async Task ExtractTestAsync()
        if (Resources.LocalExtractFolder.Exists)
        //  Resources.LocalExtractFolder.Create();
        await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);

С MaxDegreeOfParallelism = 1 все работает, но с 2 нет.

Test Name:  ExtractTestAsync
Test FullName:  Composite.Azure.Tests.ZipFileTests.ExtractTestAsync
Test Source:    c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs : line 21
Test Outcome:   Failed
Test Duration:  0:00:02.4138753

Result Message: 
Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception: 
System.IO.InvalidDataException: Unknown block type. Stream might be corrupted.
Result StackTrace:  
at System.IO.Compression.Inflater.Decode()
   at System.IO.Compression.Inflater.Inflate(Byte[] bytes, Int32 offset, Int32 length)
   at System.IO.Compression.DeflateStream.Read(Byte[] array, Int32 offset, Int32 count)
   at System.IO.Stream.InternalCopyTo(Stream destination, Int32 bufferSize)
   at System.IO.Stream.CopyTo(Stream destination)
   at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName, Boolean overwrite)
   at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName)
   at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 37
   at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action, KeyValuePair`2 messageWithId)
   at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId)
   at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 48
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs:line 25
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()

Обновление 2

Вот моя собственная попытка сделать это параллельно, она тоже не работает:) Не забудьте обрабатывать исключения в continueWith.

public static void ExtractToDirectorySemaphore(this FileInfo file, DirectoryInfo folder)

            int MaxDegreeOfParallelism = 2;
            using (var archive = ZipFile.OpenRead(file.FullName))

                var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);

                foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))

                    var task = Task.Run(() =>
                        var path = Path.Combine(folder.FullName, entry.FullName);

                    task.ContinueWith(handle =>
                            //do any cleanup/post processing
                            // Release the semaphore so the next thing can be processed
                    semaphore.WaitOne(); //Wait here until the last task completes.



И вот асинхронная версия:

public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file, DirectoryInfo folder)
            return Task.Factory.StartNew(() =>
                int MaxDegreeOfParallelism = 50;
                using (var archive = ZipFile.OpenRead(file.FullName))

                    var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);

                    foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))

                        var task = Task.Run(() =>
                            var path = Path.Combine(folder.FullName, entry.FullName);

                        task.ContinueWith(handle =>
                                //do any cleanup/post processing
                                // Release the semaphore so the next thing can be processed
                        },TaskContinuationOptions.AttachedToParent); // the outher task will wait for all.


Обновление 3

В дескрипторе выбрасываются следующие исключения. Исключение.

{"Block length does not match with its complement."}  
[0] = {"A local file header is corrupt."}

Необходимо выяснить, является ли ZipFile потокобезопасным или нет.


Ответ 1

Disclamer: единственное доказательство концепции.

Замена ZipFile.OpenRead с ParallelZipFile.OpenRead в образцах кода содержит все 4 unittests.

   public class ParallelZipFile
        public static ParallelZipArchive OpenRead(string path)

            return new ParallelZipArchive(ZipFile.OpenRead(path),path);
    public class ParallelZipArchive : IDisposable
        internal ZipArchive _archive;
        internal string _path;
        internal ConcurrentQueue<ZipArchive> FreeReaders = new ConcurrentQueue<ZipArchive>();

        public ParallelZipArchive(ZipArchive zip,string path)
            _path = path;
            _archive = zip;

        public ReadOnlyCollection<ParallelZipArchiveEntry> Entries
                var list = new List<ParallelZipArchiveEntry>(_archive.Entries.Count);
                int i = 0;
                foreach (var entry in _archive.Entries)
                    list.Add(new ParallelZipArchiveEntry(i++, entry, this));

                return  new ReadOnlyCollection<ParallelZipArchiveEntry>(list);

        public void Dispose()
            foreach (var archive in FreeReaders)
    public class ParallelZipArchiveEntry
        private ParallelZipArchive _parent;
        private int _entry;
        public string Name { get; set; }
        public string FullName { get; set; }

        public ParallelZipArchiveEntry(int entryNr, ZipArchiveEntry entry, ParallelZipArchive parent)
            _entry = entryNr;
            _parent = parent;
            Name = entry.Name;
            FullName = entry.FullName;

        public void ExtractToFile(string path)
            ZipArchive value;
            Trace.TraceInformation(string.Format("Number of readers: {0}", _parent.FreeReaders.Count));

            if (!_parent.FreeReaders.TryDequeue(out value))
                value = ZipFile.OpenRead(_parent._path);



модульные тесты

    public class ZipFileTests
        public static void PreInitialize(TestContext context)
            if (Resources.LocalExtractFolderTruth.Exists)

            ZipFile.ExtractToDirectory(Resources.WebsiteZip.FullName, Resources.LocalExtractFolderTruth.FullName);

        public void InitializeTests()
            if (Resources.LocalExtractFolder.Exists)


        public void ExtractTest()


                Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));

        public async Task ExtractAsyncTest()

            await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);

               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
        public void ExtractSemaphoreTest()

               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
        public async Task ExtractSemaphoreAsyncTest()

            await Resources.WebsiteZip.ExtractToDirectorySemaphoreAsync(Resources.LocalExtractFolder);
               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));


Ответ 2

В последнее время работала над той же задачей, и вот мой результат:

Я использую DotNetZip.reduced(Ionic.Zip.Reduced.dll v1.9.1.8) на моем Xeon 1socket, 8cores, 16cpu; 32 ГБ оперативной памяти; накопитель SSD.

Почтовый файл | Упакованный размер | Нераспакованные файлы | Распакованный размер

  • SmallFile1 | 778 МБ | 4,926 Файлы | 1.4 ГБ
  • LargeFile2 | 6 ГБ | 29 557 файлов | 10.0 ГБ

У меня есть 5 методов: сначала делает все в одном потоке, а еще 4 использует PLINQ и TPL Parallel class.

Победители V4 и V5, которые работают на x6 быстрее, чем V1. Ниже приведены подробные результаты и код.

  • V1 использует ExtractAll
  • V2 Извлекает записи параллельно (не потокобезопасно)
  • V3 Извлекает записи параллельно, открывая новый дескриптор файла для каждой записи
  • V4 Извлекает записи параллельно, используя только файлы с файлами N + 1
  • V5 Окончательная версия

Таблица результатов эффективности Почтовый файл | V1, сек | V2, сек | V3, сек | V4, сек | V5, сек

  • SmallFile1 | 32 | Исключение | 8 | 8 | 5
  • LargeFile2 | 200 | Исключение | 2000 | 35 | 30

Малая обработка файлов Малая обработка файлов

Большая обработка файлов с помощью V1 Большая обработка файлов с помощью V1

Большая обработка файлов с помощью V4 Большая обработка файлов с помощью V4

Большая обработка файлов с помощью V5 Большая обработка файлов с помощью V5

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Ionic.Zip;
using Ionic.Zlib;

namespace A1
    public static class Program
        static void Main(string[] args)

            CancellationToken cancellationToken = CancellationToken.None;

            string path = @"e:\1\";
            string zf1 = Path.Combine(path, "1.zip");
            string zf2 = Path.Combine(path, "2.zip");
            Stopwatch sw = new Stopwatch();

            List<string> zipFiles = new List<string>
            List<Action<string, string, CancellationToken>> methods = new List<Action<string, string, CancellationToken>>


            zipFiles.ForEach(f => methods.ForEach(m =>
                string fileName = Path.GetFileName(f);
                string targetDirectory = path + Guid.NewGuid().ToString("N");
                // Unzip
                    m(f, targetDirectory, cancellationToken);
                catch (Exception ex)
                Console.WriteLine("{0} processed by {1} in {2} seconds", fileName, m.GetMethodInfo().Name, sw.Elapsed.TotalSeconds.ToString("F3"));
                Thread.Sleep(5 * 1000);
                Directory.Delete(targetDirectory, true);
                Thread.Sleep(5 * 1000);

        private static void ExtractAllFilesFromZipFileV1(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
            using (ZipFile zipFile = new ZipFile(zipFileName))

        private static void ExtractAllFilesFromZipFileV2(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
            using (ZipFile zipFile = new ZipFile(zipFileName))
                    .ForAll(v =>

        private static void ExtractAllFilesFromZipFileV3(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
            using (ZipFile zipFile = new ZipFile(zipFileName))
                int count = zipFile.Entries.Count;

                Enumerable.Range(0, count)
                    .ForAll(v =>

                        using (ZipFile zf = new ZipFile(zipFileName))
                            // Get the right entry to extract

        private static void ExtractAllFilesFromZipFileV4(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
            using (ZipFile zipFile = new ZipFile(zipFileName))
                // Get count of files, files and keep the lock on the file
                int count = zipFile.Entries.Count();

                // Cache instances of ZipFile used by threads
                // Make sure that we have only open zip file not more than N times, where N is maxDop.
                ConcurrentDictionary<int, ZipFile> dictionary = new ConcurrentDictionary<int, ZipFile>();

                    Parallel.For(0, count,
                        () =>
                            // GetOrAdd. Use existing open ZipFile or open a new one for this thread.
                            return dictionary.GetOrAdd(Thread.CurrentThread.ManagedThreadId, v =>
                                return new ZipFile(zipFileName);
                        (int i, ParallelLoopState loopState, ZipFile zf) =>

                            // Get the right entry to extract
                            ZipEntry entry = zf.Entries

                            // Extract to a file

                            return zf;
                        zf =>
                    // Dispose cached ZipFiles
                    foreach (ZipFile zf in dictionary.Values)
            } // using

        private static void ExtractAllFilesFromZipFileV5(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
            using (ZipFile zipFile = new ZipFile(zipFileName))
                // Get count of files, files and keep the lock on the file
                ICollection<ZipEntry> zipEntries = zipFile.Entries;
                int count = zipEntries.Where(v => !v.IsDirectory).Count();

                // Caclulate max DOP
                int maxDop = (int)1.5 * Math.Min(count, Environment.ProcessorCount);

                List<Tuple<int, long>> entries = zipEntries
                    .Select((v, i) => Tuple.Create(i, v))
                    .Where(v => !v.Item2.IsDirectory)
                    .Select(v => Tuple.Create(v.Item1, v.Item2.UncompressedSize))

                // Load balance between threads
                List<List<Tuple<int, long>>> groupedItems = entries.ToBuckets(maxDop, v => v.Item2 + 10 * 1024 * 1024).ToList();

                // Ensure seq reading from zip file.
                for (int i = 0; i < groupedItems.Count; ++i)
                    groupedItems[i] = groupedItems[i].OrderBy(v => v.Item1).ToList();

                // Cache instances of ZipFile used by threads
                // Make sure that we have open zip file not more than N times, where N is maxDop.
                ConcurrentDictionary<int, Tuple<ZipFile, List<ZipEntry>>> dictionary = new ConcurrentDictionary<int, Tuple<ZipFile, List<ZipEntry>>>(maxDop, maxDop);
                ParallelOptions parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = maxDop, };

                    Parallel.For(0, maxDop, parallelOptions,
                        () =>
                            // GetOrAdd. Re-use existing open ZipFile or open a new one for this thread.
                            return dictionary.GetOrAdd(Thread.CurrentThread.ManagedThreadId, v =>
                                ZipFile zf = new ZipFile(zipFileName) { ExtractExistingFile = ExtractExistingFileAction.Throw, FlattenFoldersOnExtract = false, ZipErrorAction = ZipErrorAction.Throw, };
                                zf.ExtractProgress += (sender, e) =>
                                return Tuple.Create(zf, zf.Entries.ToList());
                        (int j, ParallelLoopState loopState, Tuple<ZipFile, List<ZipEntry>> zf) =>

                            List<Tuple<int, long>> list = groupedItems[j];
                            for (int n = 0; n < list.Count; ++n)

                                int i = list[n].Item1;

                                // Get the right entry to extract
                                ZipEntry entry = zf.Item2[i];
                                Debug.Assert(entry.UncompressedSize == list[n].Item2);

                                // Extract to a file

                            return zf;
                        zf =>
                    // Dispose cached ZipFiles
                    foreach (Tuple<ZipFile, List<ZipEntry>> zf in dictionary.Values)
                        catch (ZlibException)
                            // There is a well known defect in Ionic.ZLib
                            // This exception may happen when you read only part of file (not entire file)
                            // and close its handle.
                            // Ionic.Zlib.ZlibException: Bad CRC32 in GZIP trailer. (actual(D202EF8D)!=expected(A39D1010))

        private static IEnumerable<List<T>> ToBuckets<T>(this IEnumerable<T> list, int bucketCount, Func<T, long> getWeight)
            List<T> sortedList = list.OrderByDescending(v => getWeight(v)).ToList();

            List<long> runningTotals = Enumerable.Repeat(0L, bucketCount).ToList();
            List<List<T>> buckets = Enumerable.Range(0, bucketCount)
                .Select(v => new List<T>(sortedList.Count / bucketCount))

            foreach (T item in sortedList)
                // MinBy runningTotal
                int i = runningTotals.IndexOfMin();
                // Add to bucket
                runningTotals[i] += getWeight(item);

            return buckets;

        public static int IndexOfMin<T>(this IEnumerable<T> source, IComparer<T> comparer = null)
            if (source == null)
                throw new ArgumentNullException(nameof(source));

            if (comparer == null)
                comparer = Comparer<T>.Default;

            using (IEnumerator<T> enumerator = source.GetEnumerator())
                if (!enumerator.MoveNext())
                    return -1; // or maybe throw InvalidOperationException

                int minIndex = 0;
                T minValue = enumerator.Current;

                int index = 0;
                while (enumerator.MoveNext())
                    if (comparer.Compare(enumerator.Current, minValue) < 0)
                        minIndex = index;
                        minValue = enumerator.Current;

                return minIndex;

        public static int IndexOfMinBy<TSource, TKey>(this IEnumerable<TSource> source, Func<TSource, TKey> selector, IComparer<TKey> comparer = null)
            if (source == null)
                throw new ArgumentNullException(nameof(source));

            if (comparer == null)
                comparer = Comparer<TKey>.Default;

            using (IEnumerator<TSource> enumerator = source.GetEnumerator())
                if (!enumerator.MoveNext())
                    return -1; // or maybe throw InvalidOperationException

                int minIndex = 0;
                TKey minValue = selector(enumerator.Current);

                int index = 0;
                while (enumerator.MoveNext())
                    TKey value = selector(enumerator.Current);
                    if (comparer.Compare(value, minValue) < 0)
                        minIndex = index;
                        minValue = value;

                return minIndex;

Ответ 3

Я согласен с ответом Флости, но я предлагаю другой подход. Если ваши файлы похожи на ~ 50k внутри zip файла:

1) Создайте очередь байтовых массивов для каждого файла.  2) Каждый член очереди представляет собой извлеченную запись в zip файле.  3) Попытайтесь извлечь файлы из zip файла в массив байтов и после завершения извлечения добавьте его в очередь.  4) Линия вытяжки должна быть одной нитью, без parallelism.  5) Пока поток извлечения выполняет свою работу, создайте еще один поток/задачи для опорожнения очереди. Эти задачи будут извлекать данные из очереди и записывать их на диск. Поскольку они разные файлы, не будет никаких условий гонки или недоступных ресурсов.

Может потребоваться мьютекс или блокировка для очереди. Это может быть не самый лучший способ, но я уверен, что вы получите некоторую скорость.

Ответ 4

Мне потребовалась параллельная распаковка большого архива (~ 30 ГБ, ~ 45 тыс. Записей переменного размера), и я предложил это решение, используя DotNetZip:

    public static void ParallelExtract(
        string archivePath,
        string destinationPath,
        string password,
        CancellationToken token,
        ProgressReportDelegate progress // Could also be Progress<T> or whatever you prefer.
        if (String.IsNullOrEmpty(archivePath))
            throw new ArgumentNullException("archivePath");

        if (String.IsNullOrEmpty(destinationPath))
            throw new ArgumentNullException("destinationPath");

        Stopwatch elapsed = new Stopwatch();
        Stopwatch progressReportingTimer = new Stopwatch();


        object obj = new object();

        int count = -1;
        long bytesExtracted = 0;
        long bytesTotal = -1;

        List<Task> decompressors = new List<Task>();

        for (int i = 0; i < Environment.ProcessorCount; i++)
            decompressors.Add(Task.Run(() =>
                using (ZipFile zipFile = new ZipFile(archivePath))
                    if (!String.IsNullOrEmpty(password))
                        zipFile.Password = password;

                    zipFile.ExtractProgress += delegate (object zipSender, ExtractProgressEventArgs zipArgs)
                        // Report progress after each EntryBytesWritten event, as long as it been at least 250ms since the last report, so as to not overwhelm listeners like a progress bar. 
                        // Fire regardless upon completion (bytesExtracted == bytesTotal) to provide a final update before finishing.
                        if ((zipArgs.EventType == ZipProgressEventType.Extracting_EntryBytesWritten && progressReportingTimer.ElapsedMilliseconds >= 250) || bytesExtracted == bytesTotal)
                            int percentage = Percentage(bytesExtracted, bytesTotal);

                            lock (obj)
                                progress?.Invoke(); // <-- Handle your progress updates here.


                    // Block all threads until we sum the total size of all entries so that when we begin processing on the threadpool we 
                    // can report progress relative to the total.
                    lock (obj)
                        if (bytesTotal == -1)
                            foreach (var entry in zipFile.Entries)
                                bytesTotal += entry.CompressedSize;

                    var array = zipFile.Entries.ToArray();

                    int index;
                    ZipEntry zipEntry;

                    // Iterate through the archive entries sequentially despite being on multiple threads.
                    while (count < zipFile.Entries.Count && !token.IsCancellationRequested)
                        index = Interlocked.Increment(ref count);

                        if (index >= zipFile.Entries.Count)

                        zipEntry = array[index];

                        Interlocked.Add(ref bytesExtracted, zipEntry.CompressedSize);

                        zipEntry.Extract(destinationPath, ExtractExistingFileAction.OverwriteSilently);



Аппаратное обеспечение: Intel Core i7-4710HQ с тактовой частотой 3,50 ГГц (4 ядра, 8 с Hyper-Threading), 16 ГБ ОЗУ, (SATA) SSD, Win10x64 1903:

Архив: 44,5 тыс. Записей, ~ 30 ГБ, DEFLATE (магазин)

Threads:    Time:
1           35:20 (NOTE: This is virtually identical to ZipArchive.ExtractAll())
2           22:14
3           18:40
4           16:49
8           14:42

Ответ 5

Проблема заключается в том, что вы открываете файл только один раз только с одним дескриптором. Одна рукоятка имеет одно положение чтения, и позиция считывания перепутана, если вы выполняете параллельное чтение на одном и том же дескрипторе. Открывайте файл несколько раз с помощью нескольких дескрипторов, и вы должны быть в порядке.