Подтвердить что ты не робот

Как работает оператор RxJs 5 share()?

Не для меня 100% понятно, как работает оператор RxJs 5 share(), см. здесь последние документы. Jsbin для вопроса здесь.

Если я создаю наблюдаемое с серией от 0 до 2, каждое значение разделяется на одну секунду:

var source = Rx.Observable.interval(1000)
.take(5)
.do(function (x) {
    console.log('some side effect');
});

И если я создам двух подписчиков для этого наблюдаемого:

source.subscribe((n) => console.log("subscriptor 1 = " + n));
source.subscribe((n) => console.log("subscriptor 2 = " + n));

Я получаю это в консоли:

"some side effect ..."
"subscriptor 1 = 0"
"some side effect ..."
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"some side effect ..."
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"some side effect ..."
"subscriptor 2 = 2"

Я думал, что каждая подписка будет подписана на тот же Observable, но, похоже, это не так! Его, как акт подписки, создает совершенно отдельный Наблюдаемый!

Но если оператор share() добавляется к наблюдаемому источнику:

var source = Rx.Observable.interval(1000)
.take(3)
.do(function (x) {
    console.log('some side effect ...');
})
.share();

Тогда получим:

"some side effect ..."
"subscriptor 1 = 0"
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"subscriptor 2 = 2"

Это то, чего я ожидал бы без share().

Что происходит здесь, как работает оператор share()? Создает ли каждая подписка новую цепочку Observable?

4b9b3361

Ответ 1

Будьте осторожны, если вы используете RxJS v5, в то время как ваша ссылка для документации выглядит как RxJS v4. Я не помню специфику, но я думаю, что оператор share прошел некоторые изменения, в частности, когда дело доходит до завершения и повторной подписки, но не верьте мне на слово.

Вернемся к вашему вопросу, как вы показали в своем исследовании, ваши ожидания не соответствуют дизайну библиотеки. Наблюдаемые лениво создают поток данных, конкретно инициируя поток данных, когда абонент подписывается. Когда второй абонент подписывается на одно и то же наблюдаемое, запускается другой новый поток данных, как если бы он был первым абонентом (так что да, каждая подписка создает новую цепочку наблюдаемых, как вы сказали). Это то, что придумано в терминологии RxJS как наблюдаемое в холоде, и что поведение по умолчанию для наблюдаемого RxJS. Если вы хотите, чтобы наблюдаемый, который отправляет свои данные подписчикам, которые он имеет в момент поступления данных, это придумано как горячая наблюдаемая, и одним из способов получить горячее наблюдаемое является использование оператора share.

Здесь вы можете найти иллюстрированные потоки подписки и данных: Горячие и холодные наблюдаемые: есть ли "горячие" и "холодные" операторы? (это справедливо для RxJS v4, но большая часть из них действительна для v5).

Ответ 2

делает наблюдаемые "горячие", если выполняются эти 2 условия:

  • количество подписчиков > 0
  • И наблюдаемое не завершено

Сценарий1: количество подписчиков > 0 и наблюдаемое не завершено до новой подписки

var shared  = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 3000);

// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds
// another emission for both observers at: startTime + 10 seconds

Сценарий 2: количество подписчиков равно нулю перед новой подпиской. Становится "холодным"

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer1.unsubscribe(); 
}, 1000);

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time
}, 3000);
// observer2 onNext is called at startTime + 8 seconds
// observer2 onNext is called at startTime + 13 seconds

Сценарий 3: когда наблюдаемый был завершен до новой подписки. Становится "холодным"

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
        console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
    };

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 12000);

// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs