Подтвердить что ты не робот

Представление асинхронных последовательностей в С# 5

Как использовать С# 5 async для представления последовательности асинхронных задач? Например, если мы хотим загрузить пронумерованные файлы с сервера и возвращать их по мере их получения, как мы можем реализовать такой метод?

public async IEnumerable<File> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    for (int i = 0; i++; ) {
        yield return await DownloadFile(string.Format(format, i));
    }
}
4b9b3361

Ответ 1

Мне кажется, что вы хотите что-то очень похожее на BlockingCollection<T>, в котором вместо блокировки используются Task и await ing.

В частности, что-то, что вы можете добавить без блокировки или ожидания. Но когда вы пытаетесь удалить элемент, когда на данный момент его нет, вы можете await до тех пор, пока не будет доступен какой-либо элемент.

Открытый интерфейс может выглядеть так:

public class AsyncQueue<T>
{
    public bool IsCompleted { get; }

    public Task<T> DequeueAsync();

    public void Enqueue(T item);

    public void FinishAdding();
}

FinishAdding() необходимо, чтобы мы знали, когда заканчивать dequeuing.

При этом ваш код может выглядеть так (m_queue is AsyncQueue<File>):

var tasks = Enumerable.Range(0, 10)
    .Select(i => DownloadAndEnqueue(i))
    .ToArray();

Task.WhenAll(tasks).ContinueWith(t => m_queue.FinishAdding());

…

static async Task DownloadAndEnqueue(string url)
{
    m_queue.Enqueue(await DownloadFile(url));
}

Это не так хорошо, как вы могли себе представить, но он должен работать.

И реализация AsyncQueue<T>? Есть две очереди. Один из них - для завершенной работы, которая еще не была удалена. Другой - для Task (фактически, TaskCompletionSource<T>), которые уже были отменены, но которые пока не имеют результата.

Когда вы удаляете и завершаете работу в очереди, просто верните работу оттуда (используя Task.FromResult()). Если очередь пуста, создайте новую Task, добавьте ее в другую очередь и верните ее.

Когда вы завершаете какую-то завершенную работу, а в очереди есть Task, удалите ее и завершите с помощью полученного результата. Если очередь Task пуста, добавьте работу в первую очередь.

С помощью этого вы можете удалить и вывести из очереди столько раз, сколько захотите, и он будет работать правильно. Когда вы знаете, что новой работы не будет, вызовите FinishAdding(). Если есть ожидающие Task s, они будут генерировать исключение.

Другими словами:

public class AsyncQueue<T>
{
    private readonly object m_lock = new object();

    private bool m_finishedAdding = false;

    private readonly Queue<T> m_overflowQueue = new Queue<T>();

    private readonly Queue<TaskCompletionSource<T>> m_underflowQueue =
        new Queue<TaskCompletionSource<T>>();

    public bool IsCompleted
    {
        get { return m_finishedAdding && m_overflowQueue.Count == 0; }
    }

    public Task<T> DequeueAsync()
    {
        Task<T> result;
        lock (m_lock)
        {
            if (m_overflowQueue.Count > 0)
                result = Task.FromResult(m_overflowQueue.Dequeue());
            else if (!m_finishedAdding)
            {
                var tcs = new TaskCompletionSource<T>();
                m_underflowQueue.Enqueue(tcs);
                result = tcs.Task;
            }
            else
                throw new InvalidOperationException();
        }
        return result;
    }

    public void Enqueue(T item)
    {
        lock (m_lock)
        {
            if (m_finishedAdding)
                throw new InvalidOperationException();

            if (m_underflowQueue.Count > 0)
            {
                var tcs = m_underflowQueue.Dequeue();
                tcs.SetResult(item);
            }
            else
                m_overflowQueue.Enqueue(item);
        }
    }

    public void FinishAdding()
    {
        lock (m_lock)
        {
            m_finishedAdding = true;

            while (m_underflowQueue.Count > 0)
            {
                var tcs = m_underflowQueue.Dequeue();
                tcs.SetException(new InvalidOperationException());
            }
        }
    }
}

Если вы хотите ограничить размер рабочей очереди (и, таким образом, ограничить производителей, если они слишком быстры), вы можете сделать Enqueue() return Task тоже, что потребует еще одну очередь.

Ответ 2

Истинная последовательность работает неправильно с async/await, потому что задачи возвращают только одно значение. Вам нужен фактический перечислимый тип, например IAsyncEnumerator<T> в Ix-Async (или AsyncEx). Конструкция IAsyncEnumerator<T> описана в этом видео Channel9.

Ответ 3

Я знаю, что это было какое-то время, но я написал что-то, чтобы тесно подражать "yield return" для async перечислений здесь. Не требуется сложный код.

Вы используете его как:

public IAsyncEnumerable<File> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    return AsyncEnumerable.Create(async y =>
    {
        for (int i = 0; i++; ) {
            await y.YieldReturn(await DownloadFile(string.Format(format, i)));
        }
    };
}

Обычно я не рекламирую свой собственный код здесь, но это явно необходима функция в С# 6.0, поэтому, надеюсь, вы сочтете это полезным в С# 5.0, если вы все еще застряли в этом.

Ответ 4

Если у вас было только определенное количество URL-адресов, вы можете сделать это:

    public async Task<IEnumerable<File>> DownloadPictures()
    {
        const string format = "http://example.com/files/{0}.png";
        var urls = Enumerable.Range(0, 999).Select(i => String.Format(format, i));
        var tasks = urls.Select(u => DownloadFile(u));
        var results = Task.WhenAll(tasks);
        return await results;
    }

Ключ должен получить список задач, а затем вызвать Task.WhenAll в этом списке.

Ответ 5

Преимущество async заключается в том, что вызывающий метод может вызывать несколько операций блокировки параллельно и только блокировать, как только возвращается возвращаемое значение. Эта же возможность возможна в этом случае с помощью yield/return с использованием типа возврата IEnumerable<Task>.

public IEnumerable<Task<File>> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    for (int i = 0; i++; ) {
        yield return DownloadFileAsync(string.Format(format, i));
    }
}

Аналогично async/await метод вызова теперь может продолжать выполняться до тех пор, пока ему не понадобится следующее значение, в которое точка await/.Result может быть вызвана при следующей задаче. Следующий способ расширения демонстрирует это:

    public static IEnumerable<T> Results<T>(this IEnumerable<Task<T>> tasks)
    {
        foreach (var task in tasks)
            yield return task.Result;
    }

Если вызывающий метод хотел бы гарантировать, что все IEnumerable Task создаются и выполняются параллельно, может быть полезным метод расширения, такой как следующий (этот и вышеупомянутый метод, вероятно, уже находятся в стандартный lib):

    public static IEnumerable<T> ResultsParallel<T>(this IEnumerable<Task<T>> tasks)
    {
        foreach (var task in tasks.ToArray())
            yield return task.Result;
    }

Обратите внимание, что ответственность за беспокойство о том, что выполняется параллельно, передается вызывающему методу так же, как и при использовании async/await. В случае возникновения проблемы с созданием блокировки Task может быть создан способ расширения, такой как следующий:

    public static Task<IEnumerable<T>> ResultsAsync<T>(this IEnumerable<Task<T>> tasks)
    {
        var startedTasks = new ConcurrentQueue<Task<T>>();
        var writerTask = new Task(() =>
            {
                foreach (var task in tasks)
                {
                    startedTasks.Enqueue(task);
                }
            });
        writerTask.Start();

        var readerTask = new Task<IEnumerable<T>>(() =>
        {
            return ResultsSequential(startedTasks, () => writerTask.IsCompleted);
        });
        readerTask.Start();
        return readerTask;
    }

    private static IEnumerable<T> ResultsSequential<T>(ConcurrentQueue<Task<T>> tasks, Func<bool> isDone)
    {
        while (true)
        {
            Task<T> task;
            if (isDone.Invoke())
            {
                if (tasks.TryDequeue(out task))
                {
                    yield return task.Result;
                }
                else
                {
                    yield break;
                }
            } else if (tasks.TryDequeue(out task))
            {
                yield return task.Result;
            }
        }
    }

Эта реализация не очень эффективна. Эффективная реализация слишком велика, чтобы вписаться в поле.