Подтвердить что ты не робот

Подождите, пока пустые потоки завершатся

Прошу прощения за излишний вопрос. Тем не менее, я нашел много решений для своей проблемы, но ни один из них не очень хорошо объяснил. Я надеюсь, что здесь будет ясно.

Мой основной поток приложения С# порождает 1..n фоновых работников, использующих ThreadPool. Я хочу, чтобы исходный поток был заблокирован, пока все рабочие не завершили работу. Я изучил ManualResetEvent, в частности, но я не понимаю его использования.

В псевдо:

foreach( var o in collection )
{
  queue new worker(o);
}

while( workers not completed ) { continue; }

При необходимости я буду знать количество рабочих, которые должны быть поставлены в очередь перед началом работы.

4b9b3361

Ответ 1

Попробуйте это. Функция принимает список делегатов Action. Он добавит запись рабочего потока ThreadPool для каждого элемента в списке. Он будет ожидать завершения каждого действия перед возвратом.

public static void SpawnAndWait(IEnumerable<Action> actions)
{
    var list = actions.ToList();
    var handles = new ManualResetEvent[actions.Count()];
    for (var i = 0; i < list.Count; i++)
    {
        handles[i] = new ManualResetEvent(false);
        var currentAction = list[i];
        var currentHandle = handles[i];
        Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } };
        ThreadPool.QueueUserWorkItem(x => wrappedAction());
    }

    WaitHandle.WaitAll(handles);
}

Ответ 2

Здесь другой подход - инкапсуляция; поэтому ваш код может быть таким же простым, как:

    Forker p = new Forker();
    foreach (var obj in collection)
    {
        var tmp = obj;
        p.Fork(delegate { DoSomeWork(tmp); });
    }
    p.Join();

Если класс Forker приведен ниже (мне стало скучно в поезде; -p)... опять же, это позволяет избежать объектов ОС, но обертывает вещи довольно аккуратно (IMO):

using System;
using System.Threading;

/// <summary>Event arguments representing the completion of a parallel action.</summary>
public class ParallelEventArgs : EventArgs
{
    private readonly object state;
    private readonly Exception exception;
    internal ParallelEventArgs(object state, Exception exception)
    {
        this.state = state;
        this.exception = exception;
    }

    /// <summary>The opaque state object that identifies the action (null otherwise).</summary>
    public object State { get { return state; } }

    /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary>
    public Exception Exception { get { return exception; } }
}

/// <summary>Provides a caller-friendly wrapper around parallel actions.</summary>
public sealed class Forker
{
    int running;
    private readonly object joinLock = new object(), eventLock = new object();

    /// <summary>Raised when all operations have completed.</summary>
    public event EventHandler AllComplete
    {
        add { lock (eventLock) { allComplete += value; } }
        remove { lock (eventLock) { allComplete -= value; } }
    }
    private EventHandler allComplete;
    /// <summary>Raised when each operation completes.</summary>
    public event EventHandler<ParallelEventArgs> ItemComplete
    {
        add { lock (eventLock) { itemComplete += value; } }
        remove { lock (eventLock) { itemComplete -= value; } }
    }
    private EventHandler<ParallelEventArgs> itemComplete;

    private void OnItemComplete(object state, Exception exception)
    {
        EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock
        if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception));
        if (Interlocked.Decrement(ref running) == 0)
        {
            EventHandler allHandler = allComplete; // don't need to lock
            if (allHandler != null) allHandler(this, EventArgs.Empty);
            lock (joinLock)
            {
                Monitor.PulseAll(joinLock);
            }
        }
    }

    /// <summary>Adds a callback to invoke when each operation completes.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        ItemComplete += handler;
        return this;
    }

    /// <summary>Adds a callback to invoke when all operations are complete.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnAllComplete(EventHandler handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        AllComplete += handler;
        return this;
    }

    /// <summary>Waits for all operations to complete.</summary>
    public void Join()
    {
        Join(-1);
    }

    /// <summary>Waits (with timeout) for all operations to complete.</summary>
    /// <returns>Whether all operations had completed before the timeout.</returns>
    public bool Join(int millisecondsTimeout)
    {
        lock (joinLock)
        {
            if (CountRunning() == 0) return true;
            Thread.SpinWait(1); // try our luck...
            return (CountRunning() == 0) ||
                Monitor.Wait(joinLock, millisecondsTimeout);
        }
    }

    /// <summary>Indicates the number of incomplete operations.</summary>
    /// <returns>The number of incomplete operations.</returns>
    public int CountRunning()
    {
        return Interlocked.CompareExchange(ref running, 0, 0);
    }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action) { return Fork(action, null); }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <param name="state">An opaque object, allowing the caller to identify operations.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action, object state)
    {
        if (action == null) throw new ArgumentNullException("action");
        Interlocked.Increment(ref running);
        ThreadPool.QueueUserWorkItem(delegate
        {
            Exception exception = null;
            try { action(); }
            catch (Exception ex) { exception = ex;}
            OnItemComplete(state, exception);
        });
        return this;
    }
}

Ответ 3

Во-первых, как долго выполняются работники? потоки пулов обычно должны использоваться для краткосрочных задач - если они будут работать некоторое время, рассмотрите ручные потоки.

Возьмем проблему; вам действительно нужно блокировать основной поток? Можете ли вы использовать обратный вызов? Если да, то что-то вроде:

int running = 1; // start at 1 to prevent multiple callbacks if
          // tasks finish faster than they are started
Action endOfThread = delegate {
    if(Interlocked.Decrement(ref running) == 0) {
        // ****run callback method****
    }
};
foreach(var o in collection)
{
    var tmp = o; // avoid "capture" issue
    Interlocked.Increment(ref running);
    ThreadPool.QueueUserWorkItem(delegate {
        DoSomeWork(tmp); // [A] should handle exceptions internally
        endOfThread();
    });
}
endOfThread(); // opposite of "start at 1"

Это довольно легкий (без примитивов ОС) способ отслеживания рабочих.

Если вы нуждаетесь для блокировки, вы можете сделать то же самое с помощью Monitor (опять же, избегая объекта ОС):

    object syncLock = new object();
    int running = 1;
    Action endOfThread = delegate {
        if (Interlocked.Decrement(ref running) == 0) {
            lock (syncLock) {
                Monitor.Pulse(syncLock);
            }
        }
    };
    lock (syncLock) {
        foreach (var o in collection) {
            var tmp = o; // avoid "capture" issue
            ThreadPool.QueueUserWorkItem(delegate
            {
                DoSomeWork(tmp); // [A] should handle exceptions internally
                endOfThread();
            });
        }
        endOfThread();
        Monitor.Wait(syncLock);
    }
    Console.WriteLine("all done");

Ответ 4

Я использовал новую библиотеку параллельных задач в CTP здесь:

       Parallel.ForEach(collection, o =>
            {
                DoSomeWork(o);
            });

Ответ 5

Вот решение, использующее класс CountdownEvent.

var complete = new CountdownEvent(1);
foreach (var o in collection)
{
  var capture = o;
  ThreadPool.QueueUserWorkItem((state) =>
    {
      try
      {
        DoSomething(capture);
      }
      finally
      {
        complete.Signal();
      }
    }, null);
}
complete.Signal();
complete.Wait();

Конечно, если у вас есть доступ к классу CountdownEvent, то у вас есть весь TPL для работы. Класс Parallel заботится о вас.

Parallel.ForEach(collection, o =>
  {
    DoSomething(o);
  });

Ответ 6

Я думаю, что вы были на правильном пути с помощью ManualResetEvent. Эта ссылка содержит пример кода, который точно соответствует тому, что вы пытаетесь сделать. Ключ должен использовать WaitHandle.WaitAll и передать массив событий ожидания. Каждый поток должен установить одно из этих событий ожидания.

   // Simultaneously calculate the terms.
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateBase));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateFirstTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateSecondTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateThirdTerm));

    // Wait for all of the terms to be calculated.
    WaitHandle.WaitAll(autoEvents);

    // Reset the wait handle for the next calculation.
    manualEvent.Reset();

Edit:

Убедитесь, что на пути кода вашего рабочего потока вы установили событие (то есть autoEvents 1.Set();). Как только все они будут сигнализированы, waitAll вернется.

void CalculateSecondTerm(object stateInfo)
{
    double preCalc = randomGenerator.NextDouble();
    manualEvent.WaitOne();
    secondTerm = preCalc * baseNumber * 
        randomGenerator.NextDouble();
    autoEvents[1].Set();
}

Ответ 8

Использование .NET 4.0 Barrie r class:

        Barrier sync = new Barrier(1);

        foreach(var o in collection)
        {
            WaitCallback worker = (state) => 
            {
                // do work
                sync.SignalAndWait();
            };

            sync.AddParticipant();
            ThreadPool.QueueUserWorkItem(worker, o);
        }

        sync.SignalAndWait();