Подтвердить что ты не робот

Как избежать повторного использования с обработчиками событий async void?

В приложении WPF у меня есть класс, который получает сообщения по сети. Всякий раз, когда объект указанного класса получает полное сообщение, возникает событие. В MainWindow приложения у меня есть обработчик событий, подписанный на это событие. Обработчик событий гарантированно будет вызываться в потоке графического интерфейса приложения.

Всякий раз, когда вызывается обработчик событий, содержимое сообщения должно быть применено к модели. Это может быть довольно дорого ( > 200 мс на текущем оборудовании). Поэтому применение этого сообщения выгружается в пул потоков с помощью Task.Run.

Теперь сообщения могут быть получены с очень близкой последовательностью, поэтому обработчик события можно вызвать, пока предыдущее изменение все еще обрабатывается. Каков самый простой способ гарантировать, что сообщения применяются только по одному? До сих пор я придумал следующее:

using System;
using System.Threading.Tasks;
using System.Windows;

public partial class MainWindow : Window
{
    private Model model = new Model();
    private Task pending = Task.FromResult<bool>(false);

    // Assume e carries a message received over the network.
    private void OnMessageReceived(object sender, EventArgs e)
    {
        this.pending = ApplyToModel(e);
    }

    private async Task ApplyToModel(EventArgs e)
    {
        await this.pending;
        await Task.Run(() => this.model.Apply(e)); // Assume this is an expensive call.
    }
}

Кажется, что это работает так, как ожидалось, однако также кажется, что это неизбежно приведет к "утечке памяти", поскольку задача применения сообщения всегда будет всегда ждать выполнения задачи, которая применяла предыдущее сообщение. Если это так, то следующее изменение должно избегать утечки:

private async Task ApplyToModel(EventArgs e)
{
    if (!this.pending.IsCompleted)
    {
        await this.pending;
    }

    await Task.Run(() => this.model.Apply(e));
}

Является ли это разумным способом избежать повторного размещения с обработчиками событий async void?

EDIT. Удалено ненужное выражение await this.pending; в OnMessageReceived.

EDIT 2: сообщения должны применяться к модели в том же порядке, в котором они были получены.

4b9b3361

Ответ 1

Мы должны поблагодарить Stephen Toub здесь, поскольку у него есть некоторые очень полезные асинхронные блокировки, продемонстрированные в блоге, включая блок блокировки асинхронизации.

Вот код этой статьи (включая код из предыдущей статьи в серии):

public class AsyncLock
{
    private readonly AsyncSemaphore m_semaphore;
    private readonly Task<Releaser> m_releaser;

    public AsyncLock()
    {
        m_semaphore = new AsyncSemaphore(1);
        m_releaser = Task.FromResult(new Releaser(this));
    }

    public Task<Releaser> LockAsync()
    {
        var wait = m_semaphore.WaitAsync();
        return wait.IsCompleted ?
            m_releaser :
            wait.ContinueWith((_, state) => new Releaser((AsyncLock)state),
                this, CancellationToken.None,
                TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }

    public struct Releaser : IDisposable
    {
        private readonly AsyncLock m_toRelease;

        internal Releaser(AsyncLock toRelease) { m_toRelease = toRelease; }

        public void Dispose()
        {
            if (m_toRelease != null)
                m_toRelease.m_semaphore.Release();
        }
    }
}

public class AsyncSemaphore
{
    private readonly static Task s_completed = Task.FromResult(true);
    private readonly Queue<TaskCompletionSource<bool>> m_waiters = new Queue<TaskCompletionSource<bool>>();
    private int m_currentCount;

    public AsyncSemaphore(int initialCount)
    {
        if (initialCount < 0) throw new ArgumentOutOfRangeException("initialCount");
        m_currentCount = initialCount;
    }
    public Task WaitAsync()
    {
        lock (m_waiters)
        {
            if (m_currentCount > 0)
            {
                --m_currentCount;
                return s_completed;
            }
            else
            {
                var waiter = new TaskCompletionSource<bool>();
                m_waiters.Enqueue(waiter);
                return waiter.Task;
            }
        }
    }
    public void Release()
    {
        TaskCompletionSource<bool> toRelease = null;
        lock (m_waiters)
        {
            if (m_waiters.Count > 0)
                toRelease = m_waiters.Dequeue();
            else
                ++m_currentCount;
        }
        if (toRelease != null)
            toRelease.SetResult(true);
    }
}

Теперь применим его к вашему делу:

private readonly AsyncLock m_lock = new AsyncLock();

private async void OnMessageReceived(object sender, EventArgs e)
{
    using(var releaser = await m_lock.LockAsync()) 
    {
        await Task.Run(() => this.model.Apply(e));
    }
}

Ответ 2

Для обработчика событий, который использует async wait, мы не можем использовать блокировку вне задачи потому что вызывающий поток одинаковый для каждого вызова события, поэтому блокировка всегда позволяет ему проходить.

var object m_LockObject = new Object();

private async void OnMessageReceived(object sender, EventArgs e)
{
    // Does not work
    Monitor.Enter(m_LockObject);

    await Task.Run(() => this.model.Apply(e));

    Monitor.Exit(m_LockObject);
}

Но мы можем заблокировать внутри задачи, потому что Task.Run всегда генерирует новую задачу, которая не запускается параллельно в одном и том же потоке

var object m_LockObject = new Object();

private async void OnMessageReceived(object sender, EventArgs e)
{
    await Task.Run(() => 
    {
        // Does work
        lock(m_LockObject)
        {
            this.model.Apply(e);
        }
    });
}

Поэтому, когда событие вызывает OnMessageReceived, он возвращает immidiatly и model.Apply вводится только один за другим.