Подтвердить что ты не робот

Как вернуть возвращаемый элемент при выполнении Task.WhenAny

У меня есть два проекта в моем решении: проект WPF и библиотека классов.

В моей библиотеке классов:

У меня есть Список Символов:

class Symbol
{
     Identifier Identifier {get;set;}
     List<Quote> HistoricalQuotes {get;set;}
     List<Financial> HistoricalFinancials {get;set;}
}

Для каждого символа я запрашиваю финансовую услугу для получения исторических финансовых данных для каждого из моих символов, используя веб-запрос. (WebClient.DownloadStringTaskAsync(URI);)

Итак, вот мой метод, который делает это:

    public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
    {
        var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

        foreach (var symbol in await _listSymbols)
        {
            historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
        }

        while (historicalFinancialTask.Count > 0)
        {
            var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
            historicalFinancialTask.Remove(historicalFinancial);

            // the line below doesn't compile, which is understandable because method return type is a Task of something
            yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); 
        }
    }

    private async Task<HistoricalFinancialResult> GetFinancialsQueryAsync(Symbol symbol)
    {
        var result = new HistoricalFinancialResult();
        result.Symbol = symbol;
        result.Data = await _financialsQuery.GetFinancialsQuery(symbol.Identifier); // contains some logic like parsing and use WebClient to query asynchronously
        return result;
    }

    private class HistoricalFinancialResult
    {
        public Symbol Symbol { get; set; }
        public IEnumerable<Financial> Data { get; set; }

        // equality members
    }

Как вы можете видеть, я хочу, чтобы каждый раз, когда я загружаю финансовые исторические данные на символ, чтобы получить результат, а не ждать, пока все мои вызовы будут завершены.

И в моем WPF, вот что я хотел бы сделать:

foreach(var symbol in await _service.GetSymbolsAsync())
{
      SymbolsObservableCollection.Add(symbol);
}

Кажется, мы не можем дать доход в асинхронном методе, то какое решение я могу использовать? За исключением перемещения метода GetSymbols в мой проект WPF.

4b9b3361

Ответ 1

В то время как мне нравятся компоненты TPL Dataflow (которые, как подсказывает, вы используете), переход к этой системе требует значительных обязательств - это не то, что вы можете просто добавить к существующему дизайну. Он предлагает значительные преимущества, если вы выполняете большие объемы обработки данных с интенсивным использованием процессора и хотите использовать многие ядра ЦП. Но получить лучшее из этого нетривиально.

Его другое предложение, используя Rx, может быть проще интегрировать с существующим решением. (См. оригинальную документацию, но для последнего кода используйте Rx-Main пакет nuget. Или если вы хотели бы посмотреть на источник, см. сайт Rx CodePlex). Возможно, даже если код вызова будет продолжен, используя IEnumerable<Symbol>, если вы хотите - вы можете использовать Rx только как деталь реализации, [ edit 2013/11/09, чтобы добавить:], хотя, как указывает svick, это, вероятно, не очень хорошая идея, учитывая вашу конечную цель.

Прежде чем я покажу вам пример, я хочу четко сказать, что именно мы делаем. В вашем примере был метод с этой сигнатурой:

public async Task<IEnumerable<Symbol>> GetSymbolsAsync()

Этот тип возвращаемого значения Task<IEnumerable<Symbol>>, по существу, говорит: "Это метод, который производит единственный результат типа IEnumerable<Symbol>, и он может не сразу произвести этот результат".

Это тот единственный бит результата, который, как я думаю, вызывает у вас горе, потому что это не то, что вы хотите. A Task<T> (независимо от того, что T может быть) представляет собой одиночную асинхронную операцию. Он может иметь много шагов (многие используют await, если вы реализуете его как метод С# async), но в конечном итоге он производит одно. Вы хотите создавать несколько вещей в разное время, поэтому Task<T> не подходит.

Если бы вы действительно собирались сделать то, что ваша сигнатура метода promises - в конечном итоге произведет один результат - один способ, которым вы могли бы это сделать, - это создать метод async для создания списка, а затем произвести это как результат, когда он будет хорошим и готовым

// Note: this first example is *not* what you want.
// However, it is what your method signature promises to do.
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
{
    var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

    foreach (var symbol in await _listSymbols)
    {
        historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
    }

    var results = new List<Symbol>();
    while (historicalFinancialTask.Count > 0)
    {
        var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
        historicalFinancialTask.Remove(historicalFinancial);

        results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data)); 
    }

    return results;
}

Этот метод делает то, что говорит его подпись: он асинхронно создает последовательность символов.

Но предположительно вы хотели бы создать IEnumerable<Symbol>, который будет создавать элементы по мере их появления, а не ждать, пока они не будут доступны. (В противном случае вы могли бы просто использовать WhenAll.) Вы можете это сделать, но yield return не подходит.

Короче говоря, я думаю, что вы хотите сделать, это создать асинхронный список. Там тип для этого: IObservable<T> выражает то, что, как я полагаю, вы надеялись выразить с помощью Task<IEnumerable<Symbol>>: это последовательность элементов (точно так же, как IEnumerable<T>), но асинхронная.

Это может помочь понять это по аналогии:

public Symbol GetSymbol() ...

имеет значение

public Task<Symbol> GetSymbolAsync() ...

а

public IEnumerable<Symbol> GetSymbols() ...

:

public IObservable<Symbol> GetSymbolsObservable() ...

(К сожалению, в отличие от Task<T> не существует общего соглашения об именах для вызова асинхронного метода, ориентированного на последовательность. Я добавил здесь "Observable", но это не универсальная практика. не назовет его GetSymbolsAsync, потому что люди ожидают, что вернут Task.)

Другими словами, Task<IEnumerable<T>> говорит: "Я создам эту коллекцию, когда буду хорош и готов", тогда как IObservable<T> говорит: "Вот коллекция. Я буду производить каждый элемент, когда я буду хорош и готов."

Итак, вам нужен метод, который возвращает последовательность объектов Symbol, где эти объекты создаются асинхронно. Это говорит нам, что вы действительно должны возвращать IObservable<Symbol>. Вот реализация:

// Unlike this first example, this *is* what you want.
public IObservable<Symbol> GetSymbolsRx()
{
    return Observable.Create<Symbol>(async obs =>
    {
        var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

        foreach (var symbol in await _listSymbols)
        {
            historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
        }

        while (historicalFinancialTask.Count > 0)
        {
            var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
            historicalFinancialTask.Remove(historicalFinancial);

            obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
        }
    });
}

Как вы можете видеть, это позволяет писать в значительной степени то, что вы надеялись написать, - тело этого кода почти идентично вашему. Единственное различие заключается в том, что, когда вы использовали yield return (который не компилировался), это вызывает метод OnNext для объекта, предоставленного Rx.

Написав это, вы можете легко обернуть это в IEnumerable<Symbol> ([ Edited 2013/11/29, чтобы добавить:], хотя вы, вероятно, на самом деле не хотите этого делать - см. дополнение в конце ответа):

public IEnumerable<Symbol> GetSymbols()
{
    return GetSymbolsRx().ToEnumerable();
}

Это может выглядеть несинхронно, но на самом деле позволяет асинхронно работать с базовым кодом. Когда вы вызываете этот метод, он не будет блокироваться - даже если базовый код, который выполняет работу по извлечению финансовой информации, не может сразу произвести результат, этот метод, тем не менее, немедленно вернет IEnumerable<Symbol>. Теперь, конечно, любой код, который пытается выполнить итерацию через эту коллекцию, в конечном итоге блокируется, если данные еще не доступны. Но критически важно то, что я думаю, что вы изначально пытались достичь:

  • Вы можете написать метод async, который выполняет работу (делегат в моем примере передан как аргумент Observable.Create<T>, но вы можете написать автономный async метод, если хотите)
  • Вызывающий код не будет заблокирован просто в результате запроса начать загрузку символов
  • Получаемый IEnumerable<Symbol> будет производить каждый отдельный элемент, как только он станет доступен

Это работает, потому что метод Rx ToEnumerable имеет в себе некоторый умный код, который соединяет разрыв между синхронным мировоззрением IEnumerable<T> и асинхронным производством результатов. (Другими словами, это делает именно то, что вы были разочарованы, обнаружив, что С# не смог сделать для вас.)

Если вам интересно, вы можете посмотреть на источник. Код, который лежит в основе того, что ToEnumerable можно найти на https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs

[ Отредактировано 2013/11/29, чтобы добавить:]

svick указал в комментариях, что я пропустил: ваша конечная цель - поместить содержимое в ObservableCollection<Symbol>. Как-то я этого не видел. Это означает, что IEnumerable<T> - неправильный путь - вы хотите заполнить коллекцию, поскольку элементы становятся доступными, а не выполняются с помощью цикла foreach. Поэтому вы просто сделаете это:

GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol));

или что-то в этом роде. Это добавит элементы в коллекцию по мере их появления.

Это зависит от того, что все началось с потока пользовательского интерфейса. Если это так, ваш асинхронный код должен работать в потоке пользовательского интерфейса, что означает, что когда элементы добавляются в коллекцию, это также происходит в потоке пользовательского интерфейса. Но если по какой-то причине вы в конечном итоге запускаете вещи из рабочего потока (или если вы будете использовать ConfigureAwait для любого из ожидающих, тем самым нарушая соединение с потоком пользовательского интерфейса), вам нужно будет организовать обработку элементов из поток Rx в правом потоке:

GetSymbolsRx()
    .ObserveOnDispatcher()
    .Subscribe(symbol => SymbolsObservableCollection.Add(symbol));

Если вы находитесь в потоке пользовательского интерфейса, когда вы это делаете, он подберет текущего диспетчера и обеспечит его получение. Если вы подписались на неправильный поток, вы можете использовать перегрузку ObserveOn, которая принимает диспетчера. (Для этого требуется, чтобы у вас была ссылка на System.Reactive.Windows.Threading. И это методы расширения, поэтому для их содержащего пространства имен вам понадобится using, который также называется System.Reactive.Windows.Threading)

Ответ 2

То, о чем вы просите, не имеет большого смысла, потому что IEnumerable<T> - это синхронный интерфейс. Другими словами, если элемент еще не доступен, метод MoveNext() должен блокироваться, у него нет другого выбора.

Вам нужна какая-то асинхронная версия IEnumerable<T>. Для этого вы можете использовать IObservable<T> из Rx или (мой любимый) блок из потока данных TPL. При этом ваш код может выглядеть так (я также изменил некоторые переменные на лучшие имена):

public IReceivableSourceBlock<Symbol> GetSymbolsAsync()
{
    var block = new BufferBlock<Symbol>();

    GetSymbolsAsyncCore(block).ContinueWith(
        task => ((IDataflowBlock)block).Fault(task.Exception),
        TaskContinuationOptions.NotOnRanToCompletion);

    return block;
}

private async Task GetSymbolsAsyncCore(ITargetBlock<Symbol> block)
{
    // snip

    while (historicalFinancialTasks.Count > 0)
    {
        var historicalFinancialTask =
            await Task.WhenAny(historicalFinancialTasks);
        historicalFinancialTasks.Remove(historicalFinancialTask);
        var historicalFinancial = historicalFinancialTask.Result;

        var symbol = new Symbol(
            historicalFinancial.Symbol.Identifier,
            historicalFinancial.Symbol.HistoricalQuotes,
            historicalFinancial.Data);

        await block.SendAsync(symbol);
    }
}

И использование может быть:

var symbols = _service.GetSymbolsAsync();
while (await symbols.OutputAvailableAsync())
{
    Symbol symbol;
    while (symbols.TryReceive(out symbol))
        SymbolsObservableCollection.Add(symbol);
}

Или:

var symbols = _service.GetSymbolsAsync();
var addToCollectionBlock = new ActionBlock<Symbol>(
   symbol => SymbolsObservableCollection.Add(symbol));
symbols.LinkTo(
   addToCollectionBlock, new DataflowLinkOptions { PropagateCompletion = true });
await symbols.Completion;

Ответ 4

Почему бы не сделать что-то вроде этого:

public async IEnumerable<Task<Symbol>> GetSymbolsAsync()
{
    var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

    foreach (var symbol in await _listSymbols)
    {
        historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
    }

    while (historicalFinancialTask.Count > 0)
    {
        var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
        historicalFinancialTask.Remove(historicalFinancial);

        yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); 
    }
}