Подтвердить что ты не робот

Как подождать одно событие в С#, с тайм-аутом и отменой

Таким образом, мое требование состоит в том, чтобы моя функция ожидала, что первый экземпляр event Action<T> исходит из другого класса и другого потока, и обработает его в моем потоке, позволяя ожидать прерывания либо таймаутом, либо CancellationToken.

Я хочу создать общую функцию, которую я могу повторно использовать. Мне удалось создать пару вариантов, которые (я думаю), что мне нужно, но оба кажутся более сложными, чем я предполагаю, что это должно быть.

Использование

Чтобы быть понятным, пример использования этой функции будет выглядеть так, где serialDevice выплескивает события в отдельном потоке:

var eventOccurred = Helper.WaitForSingleEvent<StatusPacket>(
    cancellationToken,
    statusPacket => OnStatusPacketReceived(statusPacket),
    a => serialDevice.StatusPacketReceived += a,
    a => serialDevice.StatusPacketReceived -= a,
    5000,
    () => serialDevice.RequestStatusPacket());

Вариант 1-ManualResetEventSlim

Эта опция не плохая, но обработка Dispose ManualResetEventSlim более грязная, чем кажется. Это дает ReSharper, что я получаю доступ к модифицированным/удаленным вещам в закрытии, и по-настоящему трудно следовать, поэтому я даже не уверен, что это правильно. Может быть, там что-то мне не хватает, что может убрать это, что было бы для меня предпочтительным, но я не вижу его вслух. Вот код.

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var eventOccurred = false;
    var eventResult = default(TEvent);
    var o = new object();
    var slim = new ManualResetEventSlim();
    Action<TEvent> setResult = result => 
    {
        lock (o) // ensures we get the first event only
        {
            if (!eventOccurred)
            {
                eventResult = result;
                eventOccurred = true;
                // ReSharper disable AccessToModifiedClosure
                // ReSharper disable AccessToDisposedClosure
                if (slim != null)
                {
                    slim.Set();
                }
                // ReSharper restore AccessToDisposedClosure
                // ReSharper restore AccessToModifiedClosure
            }
        }
    };
    subscribe(setResult);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        slim.Wait(msTimeout, token);
    }
    finally // ensures unsubscription in case of exception
    {
        unsubscribe(setResult);
        lock(o) // ensure we don't access slim
        {
            slim.Dispose();
            slim = null;
        }
    }
    lock (o) // ensures our variables don't get changed in middle of things
    {
        if (eventOccurred)
        {
            handler(eventResult);
        }
        return eventOccurred;
    }
}

Вариант 2-опрос без WaitHandle

Функция WaitForSingleEvent здесь намного чище. Я могу использовать ConcurrentQueue и, следовательно, даже не нуждаюсь в блокировке. Но мне просто не нравится функция опроса Sleep, и я не вижу никакого способа обойти это с помощью этого подхода. Я бы хотел передать WaitHandle вместо Func<bool>, чтобы очистить Sleep, но второй, который я делаю, у меня есть целый трюк Dispose, чтобы очистить его снова.

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var q = new ConcurrentQueue<TEvent>();
    subscribe(q.Enqueue);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        token.Sleep(msTimeout, () => !q.IsEmpty);
    }
    finally // ensures unsubscription in case of exception
    {
        unsubscribe(q.Enqueue);
    }
    TEvent eventResult;
    var eventOccurred = q.TryDequeue(out eventResult);
    if (eventOccurred)
    {
        handler(eventResult);
    }
    return eventOccurred;
}

public static void Sleep(this CancellationToken token, int ms, Func<bool> exitCondition)
{
    var start = DateTime.Now;
    while ((DateTime.Now - start).TotalMilliseconds < ms && !exitCondition())
    {
        token.ThrowIfCancellationRequested();
        Thread.Sleep(1);
    }
}

Вопрос

Я не особо забочусь ни о одном из этих решений, ни я на 100% уверен, что любой из них на 100% прав. Является ли одно из этих решений лучше, чем другое (идиоматичность, эффективность и т.д.), Или есть более простой способ или встроенная функция для удовлетворения того, что мне нужно делать здесь?

Обновление: лучший ответ до сих пор

Модификация решения TaskCompletionSource ниже. Нет длинных затворов, замков или чего-либо еще. Кажется довольно простым. Какие-нибудь ошибки здесь?

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var tcs = new TaskCompletionSource<TEvent>();
    Action<TEvent> handler = result => tcs.TrySetResult(result);
    var task = tcs.Task;
    subscribe(handler);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        task.Wait(msTimeout, token);
    }
    finally
    {
        unsubscribe(handler);
        // Do not dispose task http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx
    }
    if (task.Status == TaskStatus.RanToCompletion)
    {
        onEvent(task.Result);
        return true;
    }
    return false;
}

Обновление 2: Еще одно отличное решение

Оказывается, что BlockingCollection работает так же, как ConcurrentQueue, но также имеет методы, принимающие токен и маркер отмены. Одна из приятных вещей в этом решении заключается в том, что он может быть обновлен, чтобы сделать WaitForNEvents довольно легко:

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var q = new BlockingCollection<TEvent>();
    Action<TEvent> add = item => q.TryAdd(item);
    subscribe(add);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        TEvent eventResult;
        if (q.TryTake(out eventResult, msTimeout, token))
        {
            handler(eventResult);
            return true;
        }   
        return false;
    }
    finally
    {
        unsubscribe(add);
        q.Dispose();
    }
}
4b9b3361

Ответ 1

Вы можете использовать Rx для преобразования события в наблюдаемое, затем в задачу и, наконец, дождаться этой задачи с помощью вашего токена/таймаута.

Одно из преимуществ этого решения над любым из существующих решений состоит в том, что он вызывает unsubscribe в потоке событий, гарантируя, что ваш обработчик не будет вызываться дважды. (В вашем первом решении вы обходите это с помощью tcs.TrySetResult вместо tcs.SetResult, но всегда приятно избавиться от "TryDoSomething" и просто гарантировать, что DoSomething всегда работает).

Другим преимуществом является простота кода. Это по существу одна строка. Поэтому вам даже не нужна отдельная функция. Вы можете встроить его так, чтобы было более ясно, что именно делает ваш код, и вы можете делать вариации темы без необходимости тонны необязательных параметров (например, ваш дополнительный initializer или разрешить ожидание N событий или отказаться от тайм-аутов/отмены в случаях, когда они не нужны). И когда вы закончите, вы получите как значение bool return val, так и фактическое result, если это будет полезно вообще.

using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
...
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null) {
    var task = Observable.FromEvent(subscribe, unsubscribe).FirstAsync().ToTask();
    if (initializer != null) {
        initializer();
    }
    try {
        var finished = task.Wait(msTimeout, token);
        if (finished) onEvent(task.Result);
        return finished;
    } catch (OperationCanceledException) { return false; }
}

Ответ 2

Вы можете использовать TaskCompletetionSource для создания Task, который вы можете отметить как выполненный или отмененный. Здесь возможная реализация для конкретного события:

public Task WaitFirstMyEvent(Foo target, CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();
    Action handler = null;
    var registration = cancellationToken.Register(() =>
    {
        target.MyEvent -= handler;
        tcs.TrySetCanceled();
    });
    handler = () =>
    {
        target.MyEvent -= handler;
        registration.Dispose();
        tcs.TrySetResult(null);
    };
    target.MyEvent += handler;
    return tcs.Task;
}

В С# 5 вы можете использовать его следующим образом:

private async Task MyMethod()
{
    ...
    await WaitFirstMyEvent(foo, cancellationToken);
    ...
}

Если вы хотите дождаться события синхронно, вы также можете использовать метод Wait:

private void MyMethod()
{
    ...
    WaitFirstMyEvent(foo, cancellationToken).Wait();
    ...
}

Здесь более общая версия, но она работает только для событий с Action сигнатурой:

public Task WaitFirstEvent(
    Action<Action> subscribe,
    Action<Action> unsubscribe,
    CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();
    Action handler = null;
    var registration = cancellationToken.Register(() =>
    {
        unsubscribe(handler);
        tcs.TrySetCanceled();
    });
    handler = () =>
    {
        unsubscribe(handler);
        registration.Dispose();
        tcs.TrySetResult(null);
    };
    subscribe(handler);
    return tcs.Task;
}

Вы можете использовать его следующим образом:

await WaitFirstEvent(
        handler => foo.MyEvent += handler,
        handler => foo.MyEvent -= handler,
        cancellationToken);

Если вы хотите, чтобы он работал с другими сигнатурами событий (например, EventHandler), вам придется создавать отдельные перегрузки. Я не думаю, что есть простой способ заставить его работать для любой подписи, тем более, что количество параметров не всегда одинаково.

Ответ 3

большое спасибо! для того, чтобы помочь другим понять... (возможно, показывать код последовательного устройства с кодом обработчика действий попаданий)

Вы также можете поместить ограничение общего типа, добавив что-то вроде

 where TEvent : EventArgs

в моем случае мне также нужен результат вне события в "официанте", поэтому я изменил подпись, как

 public static bool WaitForSingleEventWithResult<TEvent, TObjRes>(
            this CancellationToken token,
            Func<TEvent, TObjRes> onEvent,
             ...

называя это таким образом

        var ct = new CancellationToken();
        object result;
        bool eventOccurred = ct.WaitForSingleEventWithResult<MyEventArgs, object>(
            onEvent: statusPacket => result = this.OnStatusPacketReceived(statusPacket),
            subscribe: sub => cp.StatusPacketReceived_Action += sub,
            unsubscribe: unsub => cp.StatusPacketReceived_Action -= unsub,
            msTimeout: 5 * 1000,
            initializer: /*() => serialDevice.RequestStatusPacket()*/null);

в любом случае... большое спасибо!