Подтвердить что ты не робот

Как запустить и остановить интервал, наблюдаемый в RXJS?

У меня очень простое времяInterval наблюдаемое, и я хочу начать/остановить передачу без отключения подписчиков (которые должны сидеть и ждать независимо от наблюдаемого статуса). Возможно, и если да, то как?

var source = Rx.Observable
  .interval(500)
  .timeInterval()
  .map(function (x) { return x.value + ':' + x.interval; })
  .take(10);

  var subscription = source.subscribe(
  function (x) {
     $("#result").append('Next: ' + x + ' ');
  },
  function (err) {
    $("#result").append('Error: ' + err);
  },
  function () {
    $("#result").append('Completed');
  });

Общий комментарий: большинство примеров, которые я видел, показывают, как определять наблюдаемые и подписчиков. как влиять на поведение существующих объектов?

4b9b3361

Ответ 1

Зависит от того, что является источником сигнала остановки/возобновления. Самый простой способ, о котором я могу думать, - это pausable operator, который, как говорится в документации, лучше работает с горячими наблюдаемыми. Итак, в следующем примере кода я удалил take(10) (ваш подозрительный сигнал теперь проходит через объект pauser) и добавлен share, чтобы превратить ваш наблюдаемый в горячий.

var pauser = new Rx.Subject();
var source = Rx.Observable
  .interval(500)
  .timeInterval()
  .map(function (x) { return x.value + ':' + x.interval; })
  .share()
  .pausable(pauser);

var subscription = source.subscribe(
  function (x) {
     $("#result").append('Next: ' + x + ' ');
  },
  function (err) {
    $("#result").append('Error: ' + err);
  },
  function () {
    $("#result").append('Completed');
});

  // To begin the flow
pauser.onNext(true); // or source.resume();

// To pause the flow at any point
pauser.onNext(false);  // or source.pause();

Вот более сложный пример, который остановит ваш источник каждые 10 элементов:

// Helper functions
function emits ( who, who_ ) {return function ( x ) {
 who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}

var pauser = new Rx.Subject();
var source = Rx.Observable
  .interval(500)
  .timeInterval()
  .map(function (x) { return x.value + ':' + x.interval; })
  .share();
var pausableSource = source
  .pausable(pauser);

source
  .scan(function (acc, _){return acc+1}, 0)
  .map(function(counter){return !!(parseInt(counter/10) % 2)})
  .do(emits(ta_validation, 'scan'))
  .subscribe(pauser);

var subscription = pausableSource.subscribe(
  function (x) {
     $("#ta_result").append('Next: ' + x + ' ');
  },
  function (err) {
    $("#ta_result").append('Error: ' + err);
  },
  function () {
    $("#ta_result").append('Completed');
});

Теперь у вас должен быть ваш ответ на второй вопрос. Объедините наблюдаемые вами данные с соответствующими операторами RxJS, чтобы реализовать свой прецедент. Вот что я здесь сделал.