Подтвердить что ты не робот

RxJS: как не подписаться на начальное значение и/или undefined?

Являясь новичком в RxJS, я часто создаю тему, которая хранит значения в будущем, но изначально undefined. Это может быть только undefined в первый раз. В настоящее время я использую filter для пропуска значений undefined, но это довольно громоздко, поскольку я делаю это везде, поскольку мне нужно только один раз. (Может быть, я здесь что-то не так?) Могу ли я как-то подписаться на mySubject только после того, как он получил свое первое значение через onNext?

var mySubject = new Rx.BehaviorSubject(undefined);

mySubject.filter(function(value) {
  return value !== undefined;
}).subscribe(function(value) {
  // do something with the value
});
4b9b3361

Ответ 1

Используйте new Rx.ReplaySubject(1) вместо BehaviorSubject.

Ответ 2

Как указано Will, вы сможете просто пропустить первое значение с помощью оператора пропуска:

var mySubject = new Rx.BehaviorSubject(undefined);

mySubject.skip(1).subscribe(function(value) {
  // do something with the value
});

Ответ 3

mySubject.pipe( skipWhile( v => !v ) );

Ответ 4

Пока я использую оператор filter, но я не знаю, хорошее ли это решение:

var mySubject = new Rx.BehaviorSubject().filter(x => !!x);

mySubject.subscribe(value => { /* will receive value from below */);

mySubject.next('value');

mySubject.subscribe(value => { /* also receives the value */ });

Ответ 5

Иногда возникает необходимость в поведении поведения, где начальное значение не имеет значения, а текущее значение требуется асинхронно при работе в потоке. В нашем случае несколько цепных обещаний обрабатываются с отменой пользователем во время обработки или во время извлечения данных из любого места. внутри потока.

Это может быть достигнуто с помощью следующего способа.

// for user related commands
this.commandSource = new BehaviorSubject(CONTINUE);
// filtering over initial value which is continue to make it as a different pipe
const stopPipe = commandSource.pipe(filter(val => val === STOP));
const fetchStream = Observable.fromPromise(this.fetchDetails);

merge(fetchStream, stopPipe).pipe(
 take(1),
 takeWhile(() => commandSource.value === CONTINUE),
 concatMap((response) => {
  // fetch Another response you can return promise directly in concatMap
  // return array of response [1 ,2 ,3];
  return this.fetchYetAnotherDetails;
 }),
 // we can add this to stop stream in multiple places while processing the response
 takeWhile(() => commandSource.value === CONTINUE),
 // triggers parallelly values from the concatMap that is 1, 2 , 3
 mergeMap(() => // massage the response parallelly using )
 finalize(() => thi
  commandSource.complete())
).subscribe(res => {
 // handle each response 1, 2, 3 mapped
}, () => {
 // handle error
}, () => {
 // handle complete of the stream
});

// when user, clicks cancel, this should stop the stream.
commandSource.next(STOP)

Ответ 6

Я нашел это разочарование в RxJS и RxSwift. (Желание ценного предмета в сочетании с умением ждать первого значения).

Что касается JS, я в настоящее время просто убираю отфильтрованную версию в теме, что-то вроде этого:

    let mySubject = new Rx.BehaviorSubject();
    mySubject.wait = mySubject.pipe(filter(v=>v!==undefined));

Таким образом, тема все еще доступна для публикации, но клиентам не нужно повторять фильтр.

    mySubject.wait.subscribe((v)=>{...});