Подтвердить что ты не робот

Реактивное программирование - RxJS vs EventEmitter в Node.js

Недавно я начал искать RxJS и библиотеки RxJava (из Netflix), которые работают над концепцией реактивного программирования.

Node.js работает на основе циклов событий, который предоставляет вам весь арсенал для асинхронного программирования, а последующие библиотеки node, такие как "кластер", помогут вам лучше всего использовать вашу многоядерную машину. И Node.js также предоставляет вам функцию EventEmitter, где вы можете подписаться на события и действовать асинхронно.

С другой стороны, если я правильно понимаю, RxJS (и реактивное программирование в целом) работает по принципу потоков событий, подписываясь на потоки событий, асинхронно преобразуя данные потока событий.

Итак, вопрос в том, что означает использование пакетов Rx в Node.js. Насколько отличается цикл событий node, эмитент событий и подписки на потоки Rx и подписки.

4b9b3361

Ответ 1

Вы правы, что потоки Rx и EventEmitter очень похожи, оба являются реализациями шаблона Observer.

Различие заключается в том, что Rx содержит функции для преобразования и объединения потоков событий. Представьте себе, например, что мы хотим отложить каждое "событие ответа" на 2 секунды. С EventEmitter вам придется подписаться на это и вручную сделать тайм-аут:

eventEmitter.on('response', function(res) {
    setTimeout(function() { /* handle res */ }, 2000);
});

С помощью Rx вы можете создать поток событий новый, который представляет собой просто исходный поток, применяемый к функции задержки:

responseStream.delay(2000).subscribe(function(res) {
    /* handle res */
});

delay() - простая функция, и один из доступен многим другим. Существует так много разных способов модификации потоков, что становится возможным программировать множество логики приложения, просто преобразуя потоки со всеми возможными функциями вместо того, чтобы полагаться на низкоуровневую логику, такую ​​как setTimeout.

Также проверьте эту визуальную и интерактивную разведку этих функций.

Ответ 2

Наблюдаемые не похожи на EventEmitters. В некоторых случаях они могут действовать как EventEmitters, а именно, когда они многоадресные с использованием тем RxJS, но обычно они не действуют как EventEmitters.

Короче говоря, субъект RxJS похож на EventEmitter, но наблюдаемый RxJS - более общий интерфейс. Наблюдаемые больше похожи на функции с нулевыми аргументами.

Учтите следующее:


function foo() {
  console.log('Hello');
  return 42;
}

var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y);

Конечно, мы все ожидаем увидеть как результат:

"Hello"
42
"Hello"
42

Вы можете написать то же поведение выше, но с Observables:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(function (x) {
  console.log(x);
});
foo.subscribe(function (y) {
  console.log(y);
});

И вывод такой же:

"Hello"
42
"Hello"
42

Это потому, что обе функции и Observables ленивые вычисления. Если вы не вызовете функцию, console.log('Hello') не произойдет. Также с Observables, если вы не "позвоните" (subscribe), console.log('Hello') не произойдет. Кроме того, "вызов" или "подписка" - это независимая операция: два вызова функций вызывают два отдельных побочных эффекта, а две наблюдаемые подписки вызывают два отдельных побочных эффекта. В отличие от EventEmitters, которые разделяют побочные эффекты и имеют активное выполнение независимо от наличия подписчиков, Observables не имеют общего выполнения и ленивы.


До сих пор нет разницы между поведением функции и наблюдаемой. Этот вопрос StackOverflow лучше сформулировать как "RxJS Observables vs functions?".

Некоторые люди утверждают, что Observables являются асинхронными. Это неправда. Если вы окружаете вызов функции журналами, например так:

console.log('before');
console.log(foo.call());
console.log('after');

Вы, очевидно, увидите результат:

"before"
"Hello"
42
"after"

И это то же самое поведение с Observables:

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

И вывод:

"before"
"Hello"
42
"after"

Что доказывает, что подписка на foo была полностью синхронной, как функция.


Так в чем же разница между наблюдаемой и функцией?

Наблюдаемые объекты могут "возвращать" несколько значений с течением времени, чего нет у функций. Вы не можете сделать это:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // dead code. will never happen
}

Функции могут возвращать только одно значение. Наблюдаемые, однако, могут сделать это:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  observer.next(200);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

С синхронным выходом:

"before"
"Hello"
42
100
200
"after"

Но вы также можете "возвращать" значения асинхронно:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(function () {
    observer.next(300);
  }, 1000);
});

С выходом:

"before"
"Hello"
42
100
200
"after"
300

Заключить,

  • func.call() означает "дать мне одно значение немедленно (синхронно)"
  • obsv.subscribe() означает "дай мне значения. Может быть, многие из них, может быть, синхронно, может быть асинхронно"

Вот как наблюдаемые являются обобщением функций (которые не имеют аргументов).

Ответ 3

Когда приёмник подключается к эмиттеру?

С помощью излучателей событий слушатели уведомляются о каждом событии, которое их интересует. Когда новый слушатель будет добавлен после события, он не узнает о прошлом событии. Также новый слушатель не будет знать историю событий, которые произошли раньше. Конечно, мы могли бы вручную запрограммировать наш эмиттер и слушатель для обработки этой пользовательской логики.

С реактивными потоками абонент получает поток событий, который произошел с самого начала. Поэтому время, когда он подписывается, не является строгим. Теперь он может выполнять множество операций над потоком, чтобы получить подпоток событий, которые ему интересны.

Преимущество этого:

  • когда нам нужно обработать события, которые произошли с течением времени
  • порядок, в котором они произошли
  • в которых произошли события (скажем, после каждого события покупки в акции Google событие продажи на складе Microsoft происходит в течение 5 минут).

Потоки более высокого порядка:

Поток более высокого порядка представляет собой "поток потоков": поток, значения которого сами являются потоками.

С эмитентами событий один из способов сделать это - это тот же самый прослушиватель, подключенный к нескольким излучателям событий. Это становится сложным, когда нам нужно сопоставить событие, произошедшее на разных излучателях.

С реактивными потоками это бриз. Пример из mostjs (это библиотека реактивного программирования, такая как RxJS, но более эффективная)

const firstClick = most.fromEvent('click', document).take(1);
const mousemovesAfterFirstClick = firstClick.map(() =>
    most.fromEvent('mousemove', document)
        .takeUntil(most.of().delay(5000)))

В приведенном выше примере мы сопоставляем события кликов с событиями перемещения мыши. Выделение шаблонов между событиями становится настолько проще, когда события доступны в виде потока.

Сказав это, с помощью EventEmitter мы могли бы выполнить все это путем разработки наших излучателей и слушателей. Он нуждается в разработке, потому что он не предназначен в первую очередь для таких сценариев. В то время как реактивные потоки делают это настолько плавно, потому что предназначены для решения таких проблем.