Подтвердить что ты не робот

Создание потока Node.js из двух потоковых потоков

Я хотел бы объединить два потока Node.js в один, если возможно, путем их конвейера. Я использую Transform потоки.

Другими словами, я бы хотел, чтобы моя библиотека возвращала myStream для использования людьми. Например, они могли бы написать:

process.stdin.pipe(myStream).pipe(process.stdout);

И внутренне я использую сторонний vendorStream, который выполняет некоторую работу, подключенную к моей собственной логике, содержащейся в myInternalStream. Итак, что выше было бы переведено:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout);

Могу ли я сделать что-то подобное? Я пробовал var myStream = vendorStream.pipe(myInternalStream), но это явно не работает.

Чтобы сделать аналогию с bash, скажем, я хочу написать программу, которая проверяет, присутствует ли буква h в последней строке некоторого потока (tail -n 1 | grep h), я могу создать оболочку script:

# myscript.sh
tail -n 1 | grep h

И тогда, если люди:

$ printf "abc\ndef\nghi" | . myscript.sh

Он просто работает.

Это то, что у меня есть до сих пор:

// Combine a pipe of two streams into one stream

var util = require('util')
  , Transform = require('stream').Transform;

var chunks1 = [];
var stream1 = new Transform();
var soFar = '';
stream1._transform = function(chunk, encoding, done) {
  chunks1.push(chunk.toString());
  var pieces = (soFar + chunk).split('\n');
  soFar = pieces.pop();
  for (var i = 0; i < pieces.length; i++) {
    var piece = pieces[i];
    this.push(piece);
  }
  return done();
};

var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
  chunks2.push(chunk.toString());
  count = count + 1;
  this.push(count + ' ' + chunk.toString() + '\n');
  done();
};

var stdin = process.stdin;
var stdout = process.stdout;

process.on('exit', function () {
    console.error('chunks1: ' + JSON.stringify(chunks1));
    console.error('chunks2: ' + JSON.stringify(chunks2));
});
process.stdout.on('error', process.exit);


// stdin.pipe(stream1).pipe(stream2).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

// Best working solution I could find
var stream3 = function(src) {
  return src.pipe(stream1).pipe(stream2);
};
stream3(stdin).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

Возможно ли это? Дайте мне знать, не ясно ли, что я пытаюсь сделать.

Спасибо!

4b9b3361

Ответ 1

Вы можете смотреть, как что-то будет передано в ваш поток, а затем unpipe и передайте его в интересующие вас потоки:

var PassThrough = require('stream').PassThrough;

var stream3 = new PassThrough();

// When a source stream is piped to us, undo that pipe, and save
// off the source stream piped into our internally managed streams.
stream3.on('pipe', function(source) {
  source.unpipe(this);
  this.transformStream = source.pipe(stream1).pipe(stream2);
});

// When we're piped to another stream, instead pipe our internal
// transform stream to that destination.
stream3.pipe = function(destination, options) {
  return this.transformStream.pipe(destination, options);
};

stdin.pipe(stream3).pipe(stdout);

Вы можете извлечь эту функциональность в свой собственный класс конструктивного потока:

var util = require('util');
var PassThrough = require('stream').PassThrough;

var StreamCombiner = function() {
  this.streams = Array.prototype.slice.apply(arguments);

  this.on('pipe', function(source) {
    source.unpipe(this);
    for(i in this.streams) {
      source = source.pipe(this.streams[i]);
    }
    this.transformStream = source;
  });
};

util.inherits(StreamCombiner, PassThrough);

StreamCombiner.prototype.pipe = function(dest, options) {
  return this.transformStream.pipe(dest, options);
};

var stream3 = new StreamCombiner(stream1, stream2);
stdin.pipe(stream3).pipe(stdout);

Ответ 2

Один из вариантов, возможно, заключается в использовании multipipe, который позволяет объединить несколько преобразований в один поток преобразования:

// my-stream.js
var multipipe = require('multipipe');

module.exports = function createMyStream() {
  return multipipe(vendorStream, myInternalStream);
};

Тогда вы можете сделать:

var createMyStream = require('./my-stream');

var myStream = createMyStream();

process.stdin.pipe(myStream).pipe(process.stdout);

Уточнение: это заставляет stdin проходить через vendorStream, затем myInternalStream и, наконец, stdout.