Подтвердить что ты не робот

Секвенирование и реорганизация задач

У меня есть следующий сценарий, который, я думаю, может быть довольно распространенным:

  • Существует задача (обработчик команд UI), которая может выполняться синхронно или асинхронно.

  • Команды могут прибывать быстрее, чем они обрабатываются.

  • Если для команды уже есть ожидающая задача, задача нового обработчика команд должна быть поставлена ​​в очередь и обрабатываться последовательно.

  • Каждый новый результат задачи может зависеть от результата предыдущей задачи.

Отмена должна быть соблюдена, но я хотел бы оставить ее вне сферы применения этого вопроса для простоты. Кроме того, безопасность потоков (concurrency) не является обязательным требованием, но требуется поддержка повторного входа.

Вот базовый пример того, чего я пытаюсь достичь (как консольное приложение, для простоты):

using System;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var asyncOp = new AsyncOp<int>();

            Func<int, Task<int>> handleAsync = async (arg) =>
            {
                Console.WriteLine("this task arg: " + arg);

                //await Task.Delay(arg); // make it async

                return await Task.FromResult(arg); // sync
            };

            Console.WriteLine("Test #1...");
            asyncOp.RunAsync(() => handleAsync(1000));
            asyncOp.RunAsync(() => handleAsync(900));
            asyncOp.RunAsync(() => handleAsync(800));
            asyncOp.CurrentTask.Wait();

            Console.WriteLine("\nPress any key to continue to test #2...");
            Console.ReadLine();

            asyncOp.RunAsync(() =>
            {
                asyncOp.RunAsync(() => handleAsync(200));
                return handleAsync(100);
            });

            asyncOp.CurrentTask.Wait();
            Console.WriteLine("\nPress any key to exit...");
            Console.ReadLine();
        }

        // AsyncOp
        class AsyncOp<T>
        {
            Task<T> _pending = Task.FromResult(default(T));

            public Task<T> CurrentTask { get { return _pending; } }

            public Task<T> RunAsync(Func<Task<T>> handler)
            {
                var pending = _pending;
                Func<Task<T>> wrapper = async () =>
                {
                    // await the prev task
                    var prevResult = await pending;
                    Console.WriteLine("\nprev task result:  " + prevResult);
                    // start and await the handler
                    return await handler();
                };

                _pending = wrapper();
                return _pending;
            }
        }

    }
}

Выход:

Test #1...

prev task result:  0
this task arg: 1000

prev task result:  1000
this task arg: 900

prev task result:  900
this task arg: 800

Press any key to continue to test #2...


prev task result:  800

prev task result:  800
this task arg: 200
this task arg: 100

Press any key to exit...

Он работает в соответствии с требованиями, пока повторное включение не будет введено в тесте № 2:

asyncOp.RunAsync(() =>
{
    asyncOp.RunAsync(() => handleAsync(200));
    return handleAsync(100);
});

Желаемый результат должен быть 100, 200, а не 200, 100, потому что уже существует ожидающая внешняя задача для 100. Это очевидно потому, что внутренняя задача выполняется синхронно, нарушая логику var pending = _pending; /* ... */ _pending = wrapper() для внешней задачи.

Как заставить его работать и для теста # 2?

Одним из решений было бы обеспечить асинхронность для каждой задачи с помощью Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext(). Однако я не хочу налагать асинхронное выполнение на обработчики команд, которые могут быть синхронными внутри. Кроме того, я не хочу зависеть от поведения какого-либо конкретного контекста синхронизации (т.е. Полагаясь на то, что Task.Factory.StartNew должен вернуться до того, как созданная задача была фактически запущена).

В реальном проекте я отвечаю за то, что AsyncOp выше, но не имеет никакого контроля над обработчиками команд (т.е. что находится внутри handleAsync).

4b9b3361

Ответ 1

Я почти забыл, что можно построить Task вручную, без запуска или планирования. Затем "Task.Factory.StartNew" против новой задачи (...). Начать " вернуть меня в нужное русло. Я думаю, что это один из тех немногих случаев, когда конструктор Task<TResult> может быть полезен вместе с вложенными задачами (Task<Task<T>>) и Task.Unwrap():

// AsyncOp
class AsyncOp<T>
{
    Task<T> _pending = Task.FromResult(default(T));

    public Task<T> CurrentTask { get { return _pending; } }

    public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
    {
        var pending = _pending;
        Func<Task<T>> wrapper = async () =>
        {
            // await the prev task
            var prevResult = await pending;
            Console.WriteLine("\nprev task result:  " + prevResult);
            // start and await the handler
            return await handler();
        };

        var task = new Task<Task<T>>(wrapper);
        var inner = task.Unwrap();
        _pending = inner;

        task.RunSynchronously(useSynchronizationContext ?
            TaskScheduler.FromCurrentSynchronizationContext() :
            TaskScheduler.Current);

        return inner;
    }
}

Выход:

Test #1...

prev task result:  0
this task arg: 1000

prev task result:  1000
this task arg: 900

prev task result:  900
this task arg: 800

Press any key to continue to test #2...


prev task result:  800
this task arg: 100

prev task result:  100
this task arg: 200

Теперь также очень легко сделать AsyncOp потокобезопасным, добавив lock для защиты _pending, если это необходимо.


Обновлено, ниже приведена самая последняя версия этого шаблона, которая использует TaskCompletionSource и является потокобезопасной:

/// <summary>
/// AsyncOperation
/// By Noseratio - http://stackoverflow.com/a/21427264
/// </summary>
/// <typeparam name="T">Task result type</typeparam>
class AsyncOperation<T>
{
    readonly object _lock = new Object();
    Task<T> _currentTask = null;
    CancellationTokenSource _currentCts = null;

    // a client of this class (e.g. a ViewModel) has an option 
    // to handle TaskSucceeded or TaskFailed, if needed
    public event EventHandler<TaskEventArgs> TaskSucceeded = null;
    public event EventHandler<TaskEventArgs> TaskFailing = null;

    public Task<T> CurrentTask
    {
        get
        {
            lock (_lock)
                return _currentTask;
        }
    }

    public bool IsCurrent(Task task)
    {
        lock (_lock)
            return task == _currentTask;
    }

    public bool IsPending
    {
        get
        {
            lock (_lock)
                return _currentTask != null && !_currentTask.IsCompleted;
        }
    }

    public bool IsCancellationRequested
    {
        get
        {
            lock (_lock)
                return _currentCts != null && _currentCts.IsCancellationRequested;
        }
    }

    public void Cancel()
    {
        lock (_lock)
        {
            if (_currentTask != null && !_currentTask.IsCompleted)
                _currentCts.Cancel();
        }
    }

    /// <summary>
    /// Start the task routine and observe the result of the previous task routine
    /// </summary>
    /// <param name="routine"></param>
    /// <param name="token"></param>
    /// <param name="cancelPrevious"></param>
    /// <param name="throwImmediately"></param>
    public Task<T> StartAsync(
        Func<CancellationToken, Task<T>> routine,
        CancellationToken token,
        bool cancelPrevious = true,
        bool throwImmediately = true)
    {
        Task<T> previousTask = null; // pending instance
        CancellationTokenSource previousCts = null; // pending instance CTS

        CancellationTokenSource thisCts = CancellationTokenSource.CreateLinkedTokenSource(token);
        TaskCompletionSource<T> thisTcs = new TaskCompletionSource<T>(); // this task
        CancellationToken thisToken; // this task cancellation Token
        Task<T> routineTask = null; // as returned by routine

        lock (_lock)
        {
            // remember the _currentTask as previousTask
            previousTask = _currentTask;
            previousCts = _currentCts;

            thisToken = thisCts.Token;

            // set the new _currentTask 
            _currentTask = thisTcs.Task;
            _currentCts = thisCts;
        }

        Action startAsync = async () =>
        {
            // because startAsync is "async void" method, 
            // any exception not handled inside it
            // will be immediately thrown on the current synchronization context,
            // more details: http://stackoverflow.com/a/22395161/1768303

            // run and await this task
            try
            {
                // await the previous task instance
                if (previousTask != null)
                {
                    if (cancelPrevious)
                        previousCts.Cancel();
                    try
                    {
                        await previousTask;
                    }
                    catch (OperationCanceledException)
                    {
                        // ignore previous cancellations
                    }
                }

                thisToken.ThrowIfCancellationRequested();

                routineTask = routine(thisToken);
                await routineTask;
            }
            catch (Exception ex)
            {
                // ignore cancellation
                if (ex is OperationCanceledException)
                {
                    System.Diagnostics.Debug.Print("Task cancelled, id={0}", thisTcs.Task.Id);
                    thisTcs.SetCanceled();
                    return;
                }

                // fire TaskFailing
                System.Diagnostics.Debug.Print("Task failing, id={0}", thisTcs.Task.Id);
                if (this.TaskFailing != null)
                {
                    var args = new TaskEventArgs(thisTcs.Task, ex);
                    this.TaskFailing(this, args);
                    if (args.Handled)
                    {
                        // exception handled
                        // make thisTcs cancelled rather than faulted 
                        thisTcs.SetCanceled();
                        return;
                    }
                }

                // exception unhandled
                thisTcs.SetException(ex);
                if (throwImmediately)
                    throw; // rethrow on the current synchronization context

                // exception should be observed via CurrentTask.Exception
                return;
            }

            // success, fire TaskSucceeded
            System.Diagnostics.Debug.Print("Task succeded, id={0}", thisTcs.Task.Id);
            thisTcs.SetResult(routineTask.Result);
            if (this.TaskSucceeded != null)
                this.TaskSucceeded(this, new TaskEventArgs(thisTcs.Task));
        };

        startAsync();
        return thisTcs.Task;
    }

    // StartAsync with CancellationToken.None
    public Task<T> StartAsync(
        Func<CancellationToken, Task<T>> routine,
        bool cancelPrevious = true,
        bool throwImmediately = true)
    {
        return StartAsync(routine, CancellationToken.None, cancelPrevious: true, throwImmediately: true);
    }

    /// <summary>
    /// TaskEventArgs
    /// </summary>
    public class TaskEventArgs : EventArgs
    {
        public Task<T> Task { get; private set; }
        public Exception Exception { get; private set; }
        public bool Handled { get; set; }

        public TaskEventArgs(Task<T> task, Exception exception = null)
        {
            this.Task = task;
            this.Exception = exception;
        }
    }
}