Подтвердить что ты не робот

Как запустить задачу в пользовательском TaskScheduler, используя ожидание?

У меня есть некоторые методы, возвращающие Task<T>, по которым я могу await по желанию. Я бы хотел, чтобы эти задачи выполнялись в пользовательском TaskScheduler вместо стандартного.

var task = GetTaskAsync ();
await task;

Я знаю, что могу создать новый TaskFactory (new CustomScheduler ()) и сделать из него StartNew (), но StartNew () выполняет действие и создает Task, и у меня уже есть Task (возвращается за кулисами a TaskCompletionSource)

Как я могу указать свой собственный TaskScheduler для await?

4b9b3361

Ответ 1

Я думаю, что вы действительно хотите сделать Task.Run, но с помощью специального планировщика. StartNew не работает интуитивно с асинхронными методами; Стивен Тууб имеет отличное сообщение в блоге о различиях между Task.Run и TaskFactory.StartNew.

Итак, чтобы создать свой собственный Run, вы можете сделать что-то вроде этого:

private static readonly TaskFactory myTaskFactory = new TaskFactory(
    CancellationToken.None, TaskCreationOptions.DenyChildAttach,
    TaskContinuationOptions.None, new MyTaskScheduler());
private static Task RunOnMyScheduler(Func<Task> func)
{
  return myTaskFactory.StartNew(func).Unwrap();
}
private static Task<T> RunOnMyScheduler<T>(Func<Task<T>> func)
{
  return myTaskFactory.StartNew(func).Unwrap();
}
private static Task RunOnMyScheduler(Action func)
{
  return myTaskFactory.StartNew(func);
}
private static Task<T> RunOnMyScheduler<T>(Func<T> func)
{
  return myTaskFactory.StartNew(func);
}

Затем вы можете выполнять синхронные или асинхронные методы в своем настраиваемом планировщике.

Ответ 2

TaskCompletionSource<T>.Task построен без каких-либо действий и планировщик назначается при первом вызове ContinueWith(...) (из Асинхронного программирования с использованием Reactive Framework и библиотеки параллельных задач - Часть 3).

К счастью, вы можете немного настроить поведение await, реализовав свой собственный класс, производный от INotifyCompletion, а затем используя его в шаблоне, подобном await SomeTask.ConfigureAwait(false), для настройки планировщика, который задача должна начать использовать в методе OnCompleted(Action continuation) (из жду чего угодно;).

Вот использование:

    TaskCompletionSource<object> source = new TaskCompletionSource<object>();

    public async Task Foo() {
        // Force await to schedule the task on the supplied scheduler
        await SomeAsyncTask().ConfigureScheduler(scheduler);
    }

    public Task SomeAsyncTask() { return source.Task; }

Вот простая реализация ConfigureScheduler с использованием метода расширения Task с важной частью в OnCompleted:

public static class TaskExtension {
    public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {
        return new CustomTaskAwaitable(task, scheduler);
    }
}

public struct CustomTaskAwaitable {
    CustomTaskAwaiter awaitable;

    public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {
        awaitable = new CustomTaskAwaiter(task, scheduler);
    }

    public CustomTaskAwaiter GetAwaiter() { return awaitable; }

    public struct CustomTaskAwaiter : INotifyCompletion {
        Task task;
        TaskScheduler scheduler;

        public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {
            this.task = task;
            this.scheduler = scheduler;
        }

        public void OnCompleted(Action continuation) {
            // ContinueWith sets the scheduler to use for the continuation action
            task.ContinueWith(x => continuation(), scheduler);
        }

        public bool IsCompleted { get { return task.IsCompleted; } }
        public void GetResult() { }
    }
}

Вот рабочий пример, который будет скомпилирован как консольное приложение:

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace Example {
    class Program {
        static TaskCompletionSource<object> source = new TaskCompletionSource<object>();
        static TaskScheduler scheduler = new CustomTaskScheduler();

        static void Main(string[] args) {
            Console.WriteLine("Main Started");
            var task = Foo();
            Console.WriteLine("Main Continue ");
            // Continue Foo() using CustomTaskScheduler
            source.SetResult(null);
            Console.WriteLine("Main Finished");
        }

        public static async Task Foo() {
            Console.WriteLine("Foo Started");
            // Force await to schedule the task on the supplied scheduler
            await SomeAsyncTask().ConfigureScheduler(scheduler);
            Console.WriteLine("Foo Finished");
        }

        public static Task SomeAsyncTask() { return source.Task; }
    }

    public struct CustomTaskAwaitable {
        CustomTaskAwaiter awaitable;

        public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {
            awaitable = new CustomTaskAwaiter(task, scheduler);
        }

        public CustomTaskAwaiter GetAwaiter() { return awaitable; }

        public struct CustomTaskAwaiter : INotifyCompletion {
            Task task;
            TaskScheduler scheduler;

            public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {
                this.task = task;
                this.scheduler = scheduler;
            }

            public void OnCompleted(Action continuation) {
                // ContinueWith sets the scheduler to use for the continuation action
                task.ContinueWith(x => continuation(), scheduler);
            }

            public bool IsCompleted { get { return task.IsCompleted; } }
            public void GetResult() { }
        }
    }

    public static class TaskExtension {
        public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {
            return new CustomTaskAwaitable(task, scheduler);
        }
    }

    public class CustomTaskScheduler : TaskScheduler {
        protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; }
        protected override void QueueTask(Task task) {
            TryExecuteTask(task);
        }
    }
}

Ответ 3

После комментариев похоже, что вы хотите управлять планировщиком, на котором запускается код после ожидания.

Компиляция создает продолжение от ожидания, который выполняется по текущему Синхронизационному Контексту по умолчанию. Поэтому лучше всего настроить SynchronizationContext перед вызовом.

Есть несколько способов подождать определенного контекста. См. Настроить ожидание от Jon Skeet, особенно часть SwitchTo, для получения дополнительной информации о том, как реализовать что-то вроде этого.

EDIT: Метод SwitchTo из TaskEx удален, так как это было слишком просто для неправильного использования. См. форум MSDN по причинам.

Ответ 4

Можете ли вы поместиться для вызова этого метода:

  await Task.Factory.StartNew(
        () => { /* to do what you need */ }, 
        CancellationToken.None, /* you can change as you need */
        TaskCreationOptions.None, /* you can change as you need */
        customScheduler);

Ответ 5

Невозможно встроить расширенные асинхронные функции в пользовательский TaskScheduler. Этот класс не был разработан с учетом async/await. Стандартный способ использования пользовательского TaskScheduler - это аргумент метода Task.Factory.StartNew. Этот метод не понимает асинхронные делегаты. Можно предоставить асинхронный делегат, но он обрабатывается как любой другой делегат, который возвращает некоторый результат. Чтобы получить ожидаемый результат асинхронного делегата, необходимо вызвать Unwrap() для возвращенной задачи. Это не проблема, хотя. Проблема в том, что инфраструктура TaskScheduler не рассматривает асинхронный делегат как единую единицу работы. Он разделяет каждую задачу на несколько мини-задач (используя каждый await в качестве разделителя), и каждая мини-задача обрабатывается индивидуально. Это серьезно ограничивает асинхронную функциональность, которая может быть реализована поверх этого класса. В качестве примера приведем пользовательский TaskScheduler, который предназначен для того, чтобы ставить поставленные задачи по очереди (для ограничения параллелизма, другими словами):

public class MyTaskScheduler : TaskScheduler
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);

    protected async override void QueueTask(Task task)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() => base.TryExecuteTask(task));
        }
        finally
        {
            _semaphore.Release();
        }
    }

    protected override bool TryExecuteTaskInline(Task task,
        bool taskWasPreviouslyQueued) => base.TryExecuteTask(task);

    protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
}

SemaphoreSlim должен обеспечивать одновременную работу только одного Task. К сожалению, это не работает. Семафор высвобождается преждевременно, потому что Task, переданный в вызове QueueTask(task), - это не задача, которая представляет всю работу асинхронного делегата, а только часть до первого await. Другие части передаются методу TryExecuteTaskInline. Невозможно сопоставить эти части задачи, поскольку не предоставляется ни идентификатора, ни другого механизма. Вот что происходит на практике:

var taskScheduler = new MyTaskScheduler();
var tasks = Enumerable.Range(1, 5).Select(n => Task.Factory.StartNew(async () =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Started");
    await Task.Delay(1000);
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Finished");
}, default, TaskCreationOptions.None, taskScheduler))
.Select(t => t.Unwrap())
.ToArray();
Task.WaitAll(tasks);

Выход:

05:29:58.346 Item 1 Started
05:29:58.358 Item 2 Started
05:29:58.358 Item 3 Started
05:29:58.358 Item 4 Started
05:29:58.358 Item 5 Started
05:29:59.358 Item 1 Finished
05:29:59.374 Item 5 Finished
05:29:59.374 Item 4 Finished
05:29:59.374 Item 2 Finished
05:29:59.374 Item 3 Finished

Бедствие, все задачи ставятся в очередь сразу.

Заключение: Настройка класса TaskScheduler - это не тот путь, когда требуются расширенные функции асинхронности.