Подтвердить что ты не робот

ObserveOn и SubscribeOn - где выполняется работа

Основываясь на чтении этого вопроса: В чем разница между SubscribeOn и ObserveOn

ObserveOn выполняется набор, в котором находится код в обработчике Subscribe:

stream.Subscribe(_ => { // this code here });

Метод SubscribeOn устанавливает, на какой поток выполняется настройка потока.

Я понял, что если они явно не установлены, используется TaskPool.

Теперь мой вопрос: скажем, я делаю что-то вроде этого:

Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));

Где выполняются Where predicate и SelectMany lots_of, учитывая, что some_action выполняется на диспетчере?

4b9b3361

Ответ 1

Там много вводящей в заблуждение информации о SubscribeOn и ObserveOn.

Резюме

  • SubscribeOn перехватывает вызовы для единственного метода IObservable<T>, который Subscribe, и вызывает Dispose в возвращаемом дескрипторе IDisposable на Subscribe.
  • ObserveOn перехватывает вызовы методам IObserver<T>, которые OnNext, OnCompleted и OnError.
  • Оба метода вызывают соответствующие вызовы в указанном планировщике.

Анализ и демонстрации

Утверждение

ObserveOn устанавливает, где код в обработчике Subscribe выполняется:

более запутанным, чем полезным. То, что вы называете "обработчиком подписки", действительно является обработчиком OnNext. Помните, что метод Subscribe IObservable принимает IObserver, который имеет методы OnNext, OnCompleted и OnError, но это методы расширения, которые обеспечивают удобные перегрузки, которые принимают lambdas, и создают IObserver для вас.

Позвольте мне учесть этот термин; Я думаю, что обработчик "Подписаться" является кодом в наблюдаемом, который вызывается, когда вызывается Subscribe. Таким образом, приведенное выше описание более близко напоминает цель SubscribeOn.

SubscribeOn

SubscribeOn заставляет метод Subscribe наблюдаемого выполняться асинхронно в указанном планировщике или контексте. Вы используете его, когда вы не хотите вызывать метод Subscribe на наблюдаемом из любого потока, на котором вы работаете, - как правило, потому что он может быть длительным, и вы не хотите блокировать вызывающий поток.

Когда вы вызываете Subscribe, вы вызываете наблюдаемое, которое может быть частью длинной цепочки наблюдаемых. Это только наблюдаемое, что SubscribeOn применяется к тому, что оно воздействует. Теперь может случиться так, что все наблюдаемые в цепочке будут подписаны сразу и в том же потоке, но это не обязательно. Подумайте, например, о Concat, который подписывается только на каждый последовательный поток после завершения предыдущего потока, и обычно это будет происходить в любом потоке, который предшествует поток, называемый OnCompleted.

Итак SubscribeOn находится между вашим вызовом до Subscribe и наблюдаемым, на который вы подписаны, перехватывая вызов и делая его асинхронным.

Это также влияет на удаление подписки. Subscribe возвращает дескриптор IDisposable, который используется для отмены подписки. SubscribeOn гарантирует, что вызовы Dispose запланированы в поставляемом планировщике.

Общей точкой смятения при попытке понять, что делает SubscribeOn, является то, что обработчик Subscribe наблюдаемого вполне может вызвать OnNext, OnCompleted или OnError в этом же потоке. Однако его цель не влияет на эти вызовы. Это не редкость для завершения потока перед возвратом метода Subscribe. Observable.Return делает это, например. Давайте посмотрим.

Если вы используете метод Spy, который я написал, и запустите следующий код:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");

Вы получаете этот вывод (идентификатор потока может варьироваться, конечно):

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned

Вы можете видеть, что весь обработчик подписки работал в одном потоке и заканчивался перед возвратом.

Используйте SubscribeOn для запуска асинхронно. Мы будем шпионить как на наблюдаемом Return, так и на SubscribeOn:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");

Эти выходы (добавленные мной номера строк):

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.

01 - Основной метод работает в потоке 1.

02 - наблюдаемый Return оценивается на вызывающем потоке. Мы просто получаем IObservable здесь, пока ничего не подписывается.

03 - наблюдаемый SubscribeOn оценивается в вызывающем потоке.

04 - Теперь, наконец, мы называем метод Subscribe SubscribeOn.

05 - Метод Subscribe завершается асинхронно...

06 -... и поток 1 возвращается к основному методу. Это действие SubscribeOn в действии!

07 - Между тем SubscribeOn запланировал вызов планировщика по умолчанию на Return. Здесь он получен в потоке 2.

08 - И как Return делает, он вызывает OnNext в потоке Subscribe...

09 - и SubscribeOn - это только проход.

10,11 - То же самое для OnCompleted

12 - И последнее завершено обработчик подписки Return.

Надеюсь, это очистит цель и эффект SubscribeOn!

ObserveOn

Если вы думаете о SubscribeOn как перехватчик для метода Subscribe, который передает вызов другому тегу, тогда ObserveOn выполняет одно и то же задание, но для OnNext, OnCompleted и OnError.

Вспомним наш оригинальный пример:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");

Который дал этот результат:

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned

Теперь измените это, чтобы использовать ObserveOn:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");

Мы получаем следующий вывод:

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2

01 - Основной метод работает в Thread 1.

02 - Как и ранее, наблюдаемый Return оценивается на вызывающем потоке. Мы просто получаем IObservable здесь, пока ничего не подписывается.

03 - Наблюдаемый ObserveOn также оценивается в вызывающем потоке.

04 - Теперь мы снова подписываемся на вызывающий поток, сначала на ObserveOn наблюдаемый...

05 -... который затем передает вызов на Return наблюдаемый.

06 - Теперь Return вызывает OnNext в своем обработчике Subscribe.

07 - Вот эффект ObserveOn. Мы видим, что OnNext запланирован асинхронно на Thread 2.

08 - Между тем Return вызывает OnCompleted в Thread 1...

09 - И обработчик подписки Return завершен...

10 - и тогда делает обработчик подписки ObserveOn...

11 - поэтому управление возвращается основному методу

12 - Между тем, ObserveOn отправил Return OnCompleted вызов this в Thread 2. Это могло произойти в любое время в течение 09-11, потому что оно выполняется асинхронно. Так получилось, что он наконец-то позвонил.

Каковы типичные варианты использования?

Вы чаще всего будете видеть SubscribeOn, который используется в графическом интерфейсе, когда вам нужно Subscribe на длительный наблюдаемый и хотите как можно скорее выйти из потока диспетчера - возможно, потому, что вы знаете, что это одна из тех наблюдаемых, которые все это работает в обработчике подписки. Примените его в конце наблюдаемой цепочки, потому что это первый наблюдаемый, который вызывается при подписании.

Вы чаще всего увидите ObserveOn, используемый в графическом интерфейсе, если вы хотите, чтобы вызовы OnNext, OnCompleted и OnError были перенаправлены обратно в поток диспетчера. Примените его в конце наблюдаемой цепи для перехода назад как можно позже.

Надеюсь, вы увидите, что ответ на ваш вопрос заключается в том, что ObserveOnDispatcher не будет иметь никакого значения для потоков, которые выполняются Where и SelectMany - все зависит от того, из какого потока поток их вызывает! обработчик потоковой подписки будет вызываться в вызывающем потоке, но невозможно сказать, где Where и SelectMany будут работать, не зная, как реализуется stream.

Наблюдаемые со временем жизни, которые переживают вызов Subscribe

До сих пор мы смотрели исключительно на Observable.Return. Return завершает свой поток в обработчике Subscribe. Это не атипично, но для потоков, которые, как правило, слишком много, чтобы пережить обработчик Subscribe. Посмотрите Observable.Timer, например:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");

Это возвращает следующее:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2

Вы можете четко видеть, что подписка завершена, а затем OnNext и OnCompleted вызывается позже в другом потоке.

Обратите внимание, что никакая комбинация SubscribeOn или ObserveOn не будет иметь никакого эффекта, по которому нить или планировщик Timer не хочет вызывать OnNext и OnCompleted on.

Конечно, вы можете использовать SubscribeOn для определения потока Subscribe:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");

(я намеренно меняю на NewThreadScheduler здесь, чтобы предотвратить путаницу в случае Timer, чтобы получить тот же поток потока потока, что и SubscribeOn)

Дарение:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
SubscribeOn: Observable obtained on Thread: 1
SubscribeOn: Subscribed to on Thread: 1
SubscribeOn: Subscription completed.
Subscribe returned
Timer: Subscribed to on Thread: 2
Timer: Subscription completed.
Timer: OnNext(0) on Thread: 3
SubscribeOn: OnNext(0) on Thread: 3
Timer: OnCompleted() on Thread: 3
SubscribeOn: OnCompleted() on Thread: 3

Здесь вы можете четко видеть основной поток в потоке (1), возвращающийся после его вызовов Subscribe, но подписка Timer получает свой собственный поток (2), но вызовы OnNext и OnCompleted работают на нить (3).

Теперь для ObserveOn, измените код на (для тех, которые следуют в коде, используйте пакет nuget rx-wpf):

var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");

Этот код немного отличается. Первая строка гарантирует, что у нас есть диспетчер, а также ObserveOnDispatcher - это похоже на ObserveOn, за исключением того, что он указывает, что мы должны использовать DispatcherScheduler любого потока ObserveOnDispatcher.

Этот код дает следующий результат:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
ObserveOn: OnNext(0) on Thread: 1
Timer: OnCompleted() on Thread: 2
ObserveOn: OnCompleted() on Thread: 1

Обратите внимание, что диспетчер (и основной поток) - это поток 1. Timer по-прежнему вызывает OnNext и OnCompleted в потоке по его выбору (2), но ObserveOnDispatcher выполняет сортировку вызовов на поток диспетчера, резьба (1).

Также обратите внимание, что если бы мы заблокировали поток диспетчера (например, Thread.Sleep), вы увидите, что блок ObserveOnDispatcher будет блокироваться (этот код лучше всего работает в основном методе LINQPad):

var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Console.WriteLine("Blocking the dispatcher");
Thread.Sleep(2000);
Console.WriteLine("Unblocked");

И вы увидите вывод следующим образом:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Blocking the dispatcher
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Unblocked
ObserveOn: OnNext(0) on Thread: 1
ObserveOn: OnCompleted() on Thread: 1

С помощью вызовов через ObserveOnDispatcher можно выйти только после запуска Sleep.

Ключевые моменты

Полезно иметь в виду, что Reactive Extensions - это, по сути, библиотека с бесплатной резьбой, и пытается быть настолько ленивым, насколько возможно, о том, в какой поток она работает, - вы должны сознательно вмешиваться в ObserveOn, SubscribeOn и передавать конкретные планировщики для операторов, которые принимают их, чтобы изменить это.

Ни один потребитель наблюдаемого не может сделать, чтобы контролировать, что он делает внутри - ObserveOn и SubscribeOn являются декораторами, которые обертывают площадь наблюдателей и наблюдаемых для маршальных вызовов через нитки. Надеюсь, эти примеры сделали это понятным.

Ответ 2

Я нашел Джеймса очень ясным и всеобъемлющим. Однако, несмотря на это, мне все еще приходится объяснять различия.

Поэтому я создал очень простой/глупый пример, который позволяет мне наглядно продемонстрировать, какие функции планировщиков вызывают. Я создал класс MyScheduler, который немедленно выполняет действия, но изменит цвет консоли.

Текстовый вывод планировщика SubscribeOn выводится красным цветом и что из ObserveOn планировщик выводится синим цветом.

using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace SchedulerExample
{

    class Program
    {
        static void Main(string[] args)
        {
            var mydata = new[] {"A", "B", "C", "D", "E"};
            var observable = Observable.Create<string>(observer =>
                                            {
                                                Console.WriteLine("Observable.Create");
                                                return mydata.ToObservable().
                                                    Subscribe(observer);
                                            });

            observable.
                SubscribeOn(new MyScheduler(ConsoleColor.Red)).
                ObserveOn(new MyScheduler(ConsoleColor.Blue)).
                Subscribe(s => Console.WriteLine("OnNext {0}", s));

            Console.ReadKey();
        }
    }
}

Выводится:

scheduler

И для справки MyScheduler (не подходит для реального использования):

using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace SchedulerExample
{
    class MyScheduler : IScheduler
    {
        private readonly ConsoleColor _colour;

        public MyScheduler(ConsoleColor colour)
        {
            _colour = colour;
        }

        public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
        {
            return Execute(state, action);
        }

        private IDisposable Execute<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
        {
            var tmp = Console.ForegroundColor;
            Console.ForegroundColor = _colour;
            action(this, state);
            Console.ForegroundColor = tmp;
            return Disposable.Empty;
        }

        public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
        {
            throw new NotImplementedException();
        }

        public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
        {
            throw new NotImplementedException();
        }

        public DateTimeOffset Now
        {
            get { return DateTime.UtcNow; }
        }
    }
}

Ответ 3

Я часто ошибаюсь, что .SubcribeOn используется для установки потока, где выполняется код внутри .Subscribe. Но помните, просто подумайте, что публикация и подписка должны быть такими же, как и инь-ян. Чтобы установить, где выполняется Subscribe code, используйте ObserveOn. Чтобы установить, где Observable code выполнено SubscribeOn. Или в резюме мантры: where-what, Subscribe-Observe, Observe-Subscribe.