Обходной путь для ограничения пропускной способности WaitHandle.WaitAll 64? - программирование
Подтвердить что ты не робот

Обходной путь для ограничения пропускной способности WaitHandle.WaitAll 64?

Мое приложение порождает множество небольших рабочих потоков через ThreadPool.QueueUserWorkItem, которые я отслеживаю через несколько экземпляров ManualResetEvent. Я использую метод WaitHandle.WaitAll, чтобы заблокировать мое приложение от закрытия до тех пор, пока эти потоки не будут завершены.

У меня никогда не было никаких проблем, прежде чем, поскольку мое приложение загружается с большей нагрузкой, то есть больше создаваемых потоков, теперь я начинаю получать это исключение:

WaitHandles must be less than or equal to 64 - missing documentation

Какое лучшее альтернативное решение?

Фрагмент кода

List<AutoResetEvent> events = new List<AutoResetEvent>();

// multiple instances of...
var evt = new AutoResetEvent(false);
events.Add(evt);
ThreadPool.QueueUserWorkItem(delegate
{
    // do work
    evt.Set();
});

...
WaitHandle.WaitAll(events.ToArray());

Обходной путь

int threadCount = 0;
ManualResetEvent finished = new ManualResetEvent(false);

...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
    try
    {
         // do work
    }
    finally
    {
        if (Interlocked.Decrement(ref threadCount) == 0)
        {
             finished.Set();
        }
    }
});

...
finished.WaitOne();
4b9b3361

Ответ 1

Создайте переменную, которая отслеживает количество запущенных задач:

int numberOfTasks = 100;

Создать сигнал:

ManualResetEvent signal = new ManualResetEvent(false);

Уменьшить количество задач при выполнении задачи:

if (Interlocked.Decrement(ref numberOftasks) == 0)
{

Если задачи не осталось, установите сигнал:

    signal.Set();
}

Между тем, где-то еще, дождитесь, когда будет установлен сигнал:

signal.WaitOne();

Ответ 2

Начиная с .NET 4.0, у вас есть еще две (и IMO, чистые) опции, доступные вам.

Во-первых, используйте класс CountdownEvent. Это предотвращает необходимость обращения к приращению и уменьшению самостоятельно:

int tasks = <however many tasks you're performing>;

// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
    // Queue work.
    ThreadPool.QueueUserWorkItem(() => {
        // Do work
        ...

        // Signal when done.
        e.Signal();
    });

    // Wait till the countdown reaches zero.
    e.Wait();
}

Однако есть еще более надежное решение, и чтобы использовать класс Task:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Factory.StartNew(() => {
        // Do work.
    }

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Task.WaitAll(tasks);

Использование класса Task и вызов WaitAll намного более чисты, IMO, поскольку вы скручиваете меньше примитивов на протяжении всего кода (обратите внимание, нет команд ожидания); вам не нужно настраивать счетчик, обрабатывать приращение/декрементирование, вы просто настраиваете свои задачи, а затем ждите их. Это позволяет коду быть более выразительным в том, что вы хотите сделать, а не в примитивах того, как (по крайней мере, с точки зрения управления его распараллеливанием).

.NET 4.5 предлагает еще больше параметров, вы можете упростить генерацию последовательности экземпляров Task, вызвав метод static Run в классе Task:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Run(() => {
        // Do work.
    })

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Tasks.WaitAll(tasks);

Или вы могли бы воспользоваться библиотекой TPL DataFlow (она в пространстве имен System, поэтому она официальная, даже если это загрузка из NuGet, например Entity Framework) и используйте ActionBlock<TInput>, например:

// Create the action block.  Since there not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
    // Do work.
});

// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);

// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();

// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();

Обратите внимание, что ActionBlock<TInput> по умолчанию обрабатывает по одному элементу за раз, поэтому, если вы хотите, чтобы он обрабатывал несколько действий за один раз, вам нужно установить количество параллельных элементов, которые вы хотите обработать в конструкторе, передав a ExecutionDataflowBlockOptions и установить свойство MaxDegreeOfParallelism:

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

Если ваше действие действительно безопасно в потоке, вы можете установить для свойства MaxDegreeOfParallelsim значение DataFlowBlockOptions.Unbounded:

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});

Суть в том, что у вас есть мелкомасштабный контроль над тем, насколько параллельны вам ваши варианты.

Конечно, если у вас есть последовательность элементов, которые вы хотите передать в свой экземпляр ActionBlock<TInput>, вы можете связать реализацию ISourceBlock<TOutput>, чтобы ActionBlock<TInput>, например:

// The buffer block.
var buffer = new BufferBlock<int>();

// Create the action block.  Since there not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
    // Do work.
});

// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock, 
    // Want to propagate completion state to the action block.
    new DataflowLinkOptions {
        PropagateCompletion = true,
    },
    // Can filter on items flowing through if you want.
    i => true)
{ 
    // Post 100 times to the *buffer*
    foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);

    // Signal complete, this doesn't actually stop
    // the block, but says that everything is done when the currently
    // posted items are completed.
    actionBlock.Complete();

    // Wait for everything to complete, the Completion property
    // exposes a Task which can be waited on.
    actionBlock.Completion.Wait();
}

В зависимости от того, что вам нужно сделать, библиотека потока данных TPL становится гораздо более привлекательной, поскольку она обрабатывает concurrency по всем связанным задачам, и это позволяет вам быть очень конкретными только о том, хотите, чтобы каждая часть была, при сохранении правильного разделения проблем для каждого блока.

Ответ 3

Ваше обходное решение неверно. Причина в том, что Set и WaitOne могут участвовать в гонке, если последний рабочий элемент заставляет threadCount перейти на ноль, прежде чем поток очереди будет иметь шанс поставить очередь всех рабочих элементов. Исправление прост. Рассматривайте свою очередь, как если бы это был рабочий элемент. Инициализируйте threadCount до 1 и выполните декремент и сигнал, когда очередь завершена.

int threadCount = 1;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount); 
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
        if (Interlocked.Decrement(ref threadCount) == 0) 
        { 
             finished.Set(); 
        } 
    } 
}); 
... 
if (Interlocked.Decrement(ref threadCount) == 0)
{
  finished.Set();
}
finished.WaitOne(); 

В качестве личного предпочтения мне нравится использовать класс CountdownEvent для подсчета для меня.

var finished = new CountdownEvent(1);
...
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
      finished.Signal();
    } 
}); 
... 
finished.Signal();
finished.Wait(); 

Ответ 4

Добавляя к ответу dtb, вы можете обернуть его в класс простой.

public class Countdown : IDisposable
{
    private readonly ManualResetEvent done;
    private readonly int total;
    private long current;

    public Countdown(int total)
    {
        this.total = total;
        current = total;
        done = new ManualResetEvent(false);
    }

    public void Signal()
    {
        if (Interlocked.Decrement(ref current) == 0)
        {
            done.Set();
        }
    }

    public void Wait()
    {
        done.WaitOne();
    }

    public void Dispose()
    {
        ((IDisposable)done).Dispose();
    }
}

Ответ 5

Добавление в ответ dtb, когда мы хотим иметь обратные вызовы.

using System;
using System.Runtime.Remoting.Messaging;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        Main m = new Main();
        m.TestMRE();
        Console.ReadKey();

    }
}

class Main
{
    CalHandler handler = new CalHandler();
    int numberofTasks =0;
    public void TestMRE()
    {

        for (int j = 0; j <= 3; j++)
        {
            Console.WriteLine("Outer Loop is :" + j.ToString());
            ManualResetEvent signal = new ManualResetEvent(false);
            numberofTasks = 4;
            for (int i = 0; i <= 3; i++)
            {
                CalHandler.count caller = new CalHandler.count(handler.messageHandler);
                caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal);
            }
            signal.WaitOne();
        }

    }

    private void NumberCallback(IAsyncResult result)
    {
        AsyncResult asyncResult = (AsyncResult)result;

        CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate;

        int num = caller.EndInvoke(asyncResult);

        Console.WriteLine("Number is :"+ num.ToString());

        ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState;
        if (Interlocked.Decrement(ref numberofTasks) == 0)
        {
            mre.Set();
        }
    }

}
public class CalHandler
{
    public delegate int count(int number);

    public int messageHandler ( int number )
    {
        return number;
    }

}

Ответ 6

protected void WaitAllExt(WaitHandle[] waitHandles)
{
    //workaround for limitation of WaitHandle.WaitAll by <=64 wait handles
    const int waitAllArrayLimit = 64;
    var prevEndInd = -1;
    while (prevEndInd < waitHandles.Length - 1)
    {
        var stInd = prevEndInd + 1;
        var eInd = stInd + waitAllArrayLimit - 1;
        if (eInd > waitHandles.Length - 1)
        {
            eInd = waitHandles.Length - 1;
        }
        prevEndInd = eInd;

        //do wait
        var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray();
        WaitHandle.WaitAll(whSubarray);
    }

}

Ответ 7

Я решил это, просто разбивая количество событий, чтобы ждать, не потеряв много потерянных результатов, и отлично работает в производственной среде. Выполняет код:

        var events = new List<ManualResetEvent>();

        // code omited

        var newEvent = new ManualResetEvent(false);
        events.Add(newEvent);
        ThreadPool.QueueUserWorkItem(c => {

            //thread code
            newEvent.Set();
        });

        // code omited

        var wait = true;
        while (wait)
        {
            WaitHandle.WaitAll(events.Take(60).ToArray());
            events.RemoveRange(0, events.Count > 59 ? 60 : events.Count);
            wait = events.Any();

        }

Ответ 8

Вот еще одно решение. Вот "события" представляет собой список ManualResetEvent. Размер списка может быть больше 64 (MAX_EVENTS_NO).

int len = events.Count;
if (len <= MAX_EVENTS_NO)
    {
        WaitHandle.WaitAll(events.ToArray());
    } else {
        int start = 0;
        int num = MAX_EVENTS_NO;
        while (true)
            {
                if(start + num > len)
                {
                   num = len - start;
                }
                List<ManualResetEvent> sublist = events.GetRange(start, num);
                WaitHandle.WaitAll(sublist.ToArray());
                start += num;
                if (start >= len)
                   break;
           }
   }

Ответ 9

Windows XP SP3 поддерживает максимум два WaitHandles. Для случаев более 2 приложений WaitHandles преждевременно завершается.