Подтвердить что ты не робот

Node.js Проводка одного и того же читаемого потока на несколько (записываемых) целей

Мне нужно запустить сразу две команды, которые должны читать данные из одного потока. После подачи потока в другой буфер освобождается, поэтому я не могу читать данные из этого потока снова, поэтому это не работает:

var spawn = require('child_process').spawn;
var fs = require('fs');
var request = require('request');

var inputStream = request('http://placehold.it/640x360');
var identify = spawn('identify',['-']);

inputStream.pipe(identify.stdin);

var chunks = [];
identify.stdout.on('data',function(chunk) {
  chunks.push(chunk);
});

identify.stdout.on('end',function() {
  var size = getSize(Buffer.concat(chunks)); //width
  var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);
  inputStream.pipe(convert.stdin);
  convert.stdout.pipe(fs.createWriteStream('half.png'));
});

function getSize(buffer){
  return parseInt(buffer.toString().split(' ')[2].split('x')[0]);
}

Запрос жалуется на это

Error: You cannot pipe after data has been emitted from the response.

и изменение inputStream на fs.createWriteStream дает ту же проблему, конечно. Я не хочу записывать в файл, но каким-то образом повторно использовать поток, создаваемый request (или любой другой).

Есть ли способ повторно использовать читаемый поток после завершения конвейера? Какой был бы лучший способ выполнить что-то вроде приведенного выше примера?

4b9b3361

Ответ 1

Вы не можете повторно использовать переданные данные, которые уже отправлены. И вы не можете передать поток после своего "конца". Таким образом, вы не можете обрабатывать тот же поток дважды, и вам нужны два потока. Вы должны создать дубликат потока, подключив его к двум потокам. Вы можете создать простой поток с потоком PassThrough, он просто передает ввод на вывод.

spawn = require('child_process').spawn;
pass = require('stream').PassThrough;

a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;

a.stdout.pipe(b);
a.stdout.pipe(c);

count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });

Выход

8
hi user

Ответ 2

Первый ответ работает только в том случае, если потоки занимают примерно одинаковое количество времени для обработки данных. Если кто-то занимает значительно больше времени, тем быстрее запрашиваются новые данные, следовательно, перезаписываются данные, все еще используемые более медленным (у меня была эта проблема после попытки ее решить с использованием дублированного потока).

Следующий шаблон работал очень хорошо для меня. Он использует библиотеку на основе потоков Stream2, Streamz и Promises для синхронизации потоков async посредством обратного вызова. Используя знакомый пример из первого ответа:

spawn = require('child_process').spawn;
pass = require('stream').PassThrough;
streamz = require('streamz').PassThrough;
var Promise = require('bluebird');

a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;   

a.stdout.pipe(streamz(combineStreamOperations)); 

function combineStreamOperations(data, next){
  Promise.join(b, c, function(b, c){ //perform n operations on the same data
  next(); //request more
}

count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });

Ответ 3

Для общей проблемы следующий код работает отлично

var PassThrough = require('stream').PassThrough
a=PassThrough()
b1=PassThrough()
b2=PassThrough()
a.pipe(b1)
a.pipe(b2)
b1.on('data', function(data) {
  console.log('b1:', data.toString())
})
b2.on('data', function(data) {
  console.log('b2:', data.toString())
})
a.write('text')

Ответ 4

Как насчет трубопроводов в два или более потока не в одно и то же время?

Например:

var PassThrough = require('stream').PassThrough;
var mybiraryStream = stream.start(); //never ending audio stream
var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'})
var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'})
var mypass = PassThrough
mybinaryStream.pipe(mypass)
mypass.pipe(file1)
setTimeout(function(){
   mypass.pipe(file2);
},2000)

Приведенный выше код не вызывает ошибок, но файл2 пуст

Ответ 5

У меня есть другое решение для записи в два потока одновременно, естественно, время для записи будет добавлением двух раз, но я использую его для ответа на запрос загрузки, где я хочу сохранить копию загруженный файл на моем сервере (на самом деле я использую резервную копию S3, поэтому я кэширую наиболее используемые файлы локально, чтобы избежать множественной передачи файлов)

/**
 * A utility class made to write to a file while answering a file download request
 */
class TwoOutputStreams {
  constructor(streamOne, streamTwo) {
    this.streamOne = streamOne
    this.streamTwo = streamTwo
  }

  setHeader(header, value) {
    if (this.streamOne.setHeader)
      this.streamOne.setHeader(header, value)
    if (this.streamTwo.setHeader)
      this.streamTwo.setHeader(header, value)
  }

  write(chunk) {
    this.streamOne.write(chunk)
    this.streamTwo.write(chunk)
  }

  end() {
    this.streamOne.end()
    this.streamTwo.end()
  }
}

Затем вы можете использовать это как обычный OutputStream

const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream)

и передать его вашему методу, как если бы это был ответ или файлOutputStream