Подтвердить что ты не робот

Node.js Потоки против наблюдаемых

Узнав о Observables, я нахожу их очень похожими на Node.js-потоки. У обоих есть механизм уведомления потребителя, когда появляются новые данные, происходит ошибка или нет данных (EOF).

Я хотел бы узнать о концептуальных/функциональных различиях между ними. Спасибо!

4b9b3361

Ответ 1

Оба Observables и node.js Потоки позволяют решить одну и ту же основную проблему: асинхронно обрабатывать последовательность значений. Основное различие между этими двумя, я считаю, связано с контекстом, который мотивировал его появление. Этот контекст отражен в терминологии и API.

В стороне Observables у вас есть расширение для EcmaScript, которое вводит модель реактивного программирования. Он пытается заполнить пробел между генерированием ценности и асинхронностью с минималистскими и составными понятиями Observer и Observable.

В разделе node.js и Streams вы хотели создать интерфейс для асинхронной и эффективной обработки сетевых потоков и локальных файлов. Терминология вытекает из этого начального контекста, и вы получаете pipe, chunk, encoding, flush, Duplex, Buffer и т.д. Имея прагматичный подход, который обеспечивает явную поддержку для конкретных случаев использования, вы теряют способность сочинять вещи, потому что это не так единообразно. Например, вы используете push в потоке Readable и write в Writable, хотя, по идее, вы делаете то же самое: публикуете значение.

Итак, на практике, если вы посмотрите на концепции, и если вы используете опцию { objectMode: true }, вы можете сопоставить Observable с потоком Readable и Observer с потоком Writable. Вы даже можете создать несколько простых адаптеров между двумя моделями.

var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');

var Observable = function(subscriber) {
    this.subscribe = subscriber;
}

var Subscription = function(unsubscribe) {
    this.unsubscribe = unsubscribe;
}

Observable.fromReadable = function(readable) {
    return new Observable(function(observer) {
        function nop() {};

        var nextFn = observer.next ? observer.next.bind(observer) : nop;
        var returnFn = observer.return ? observer.return.bind(observer) : nop;
        var throwFn = observer.throw ? observer.throw.bind(observer) : nop;

        readable.on('data', nextFn);
        readable.on('end', returnFn);
        readable.on('error', throwFn);

        return new Subscription(function() {
            readable.removeListener('data', nextFn);
            readable.removeListener('end', returnFn);
            readable.removeListener('error', throwFn);
        });
    });
}

var Observer = function(handlers) {
    function nop() {};

    this.next = handlers.next || nop;
    this.return = handlers.return || nop;
    this.throw = handlers.throw || nop;
}

Observer.fromWritable = function(writable, shouldEnd, throwFn) {
    return new Observer({
        next: writable.write.bind(writable), 
        return: shouldEnd ? writable.end.bind(writable) : function() {}, 
        throw: throwFn
    });
}

Возможно, вы заметили, что я изменил несколько имен и использовал более простые понятия Observer и Subscription, введенные здесь, чтобы избежать перегрузки ответственности, выполняемой Observables в Generator. В принципе, Subscription позволяет отписаться от него Observable. Во всяком случае, с приведенным выше кодом вы можете иметь pipe.

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));

По сравнению с process.stdin.pipe(process.stdout) у вас есть способ комбинировать, фильтровать и преобразовывать потоки, которые также работают для любой другой последовательности данных. Вы можете достичь этого с помощью потоков Readable, Transform и Writable, но API поддерживает подклассу вместо привязки Readable и применения функций. В модели Observable Например, преобразование значений соответствует применению функции трансформатора к потоку. Он не требует нового подтипа Transform.

Observable.just = function(/*... arguments*/) {
    var values = arguments;
    return new Observable(function(observer) {
        [].forEach.call(values, function(value) {
            observer.next(value);
        });
        observer.return();
        return new Subscription(function() {});
    });
};

Observable.prototype.transform = function(transformer) {
    var source = this;
    return new Observable(function(observer) {
        return source.subscribe({
            next: function(v) {
                observer.next(transformer(v));
            },
            return: observer.return.bind(observer),
            throw: observer.throw.bind(observer)
        });
    });
};

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
  .subscribe(Observer.fromWritable(process.stdout))

Вывод? Легко ввести реактивную модель и концепцию Observable в любом месте. Сложнее реализовать целую библиотеку вокруг этой концепции. Все эти небольшие функции должны работать последовательно. В конце концов, проект ReactiveX все еще продолжается. Но если вам действительно нужно отправить содержимое файла клиенту, поработать с кодировкой, а затем закрепить его в поддержке NodeJS, и он работает очень хорошо.