Подтвердить что ты не робот

Пример потока данных TPL и Rx Комбинированный пример

Я просто хочу узнать, как и как использовать их вместе. Я понимаю, что они могут дополнять друг друга, я просто не мог найти пример того, кто на самом деле это делает.

4b9b3361

Ответ 1

Позвольте мне начать с немного фона.

.NET framework имеет ряд специальных типов - либо подходящие классы, либо интерфейсы - Task<T>, IObservable<T>, Nullable<T>, IEnumerable<T>, Lazy<T> и т.д. - которые обеспечивают особые полномочия для базового типа T.

TPL использует Task<T> для представления асинхронного вычисления одного значения T.

Rx использует IObservable<T> для представления асинхронного вычисления нуля или более значений T.

Это асинхронный вычислительный аспект обоих из них, который объединяет TPL и Rx.

Теперь TPL также использует тип Task для представления асинхронного выполнения Action лямбда, но это можно считать частным случаем Task<T>, где T - void. Очень похоже на стандартный метод в С# возвращает void так:

public void MyMethod() { }

Rx также допускает тот же специальный случай с использованием специального типа, называемого Unit.

Разница между TPL и Rx заключается в числе возвращаемых значений. TPL является одним и только одним, тогда как Rx равно нулю или более.

Итак, если вы обрабатываете Rx особым образом, только работая с наблюдаемыми последовательностями, которые возвращают одно значение, вы можете сделать некоторые вычисления аналогично TPL.

Например, в TPL я мог бы написать:

Task.Factory
    .StartNew(() => "Hello")
    .ContinueWith(t => Console.WriteLine(t.Result));

И в Rx эквивалент будет:

Observable
    .Start(() => "Hello")
    .Subscribe(x => Console.WriteLine(x));

Я мог бы сделать еще один шаг в Rx, указав, что TPL должен использоваться для выполнения вычисления следующим образом:

Observable
    .Start(() => "Hello", Scheduler.TaskPool)
    .Subscribe(x => Console.WriteLine(x));

(По умолчанию используется пул потоков.)

Теперь я мог бы сделать несколько "смешивания и сопоставления". Если я добавлю ссылку на пространство имен System.Reactive.Threading.Tasks, я могу легко перемещаться между задачами и наблюдаемыми.

Task.Factory
    .StartNew(() => "Hello")
    .ToObservable()
    .Subscribe(x => Console.WriteLine(x));

Observable
    .Start(() => "Hello")
    .ToTask()
    .ContinueWith(t => Console.WriteLine(t.Result));

Обратите внимание на вызовы ToObservable() и .ToTask() и полученные смены из одной библиотеки в другую.

Если у меня есть наблюдаемое, которое возвращает более одного значения, я могу использовать наблюдаемый метод расширения .ToArray(), чтобы превратить несколько значений последовательности в одно значение массива, которое можно превратить в задачу. Например:

Observable
    .Interval(TimeSpan.FromSeconds(1.0))
    .Take(5) // is IObservable<long>
    .ToArray()
    .ToTask() // is Task<long[]>
    .ContinueWith(t => Console.WriteLine(t.Result.Length));

Я думаю, что это довольно простой ответ на ваш вопрос. Это то, чего вы ожидали?