Подтвердить что ты не робот

Объединение двух (или n) потоков

  • 2 потока:

    Учитывая читаемые потоки stream1 и stream2, какой идиоматический (сжатый) способ получить поток, содержащий stream1 и stream2 сцепляются

    Я не могу сделать stream1.pipe(outStream); stream2.pipe(outStream), потому что тогда содержимое потока смешано вместе.

  • n потоков:

    Учитывая EventEmitter, который испускает неопределенное количество потоков, например

    eventEmitter.emit('stream', stream1)
    eventEmitter.emit('stream', stream2)
    eventEmitter.emit('stream', stream3)
    ...
    eventEmitter.emit('end')
    

    какой идиоматический (сжатый) способ получить поток со всеми конкатенированными потоками?

4b9b3361

Ответ 1

Пакет combined-stream объединяет потоки. Пример из README:

var CombinedStream = require('combined-stream');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));

Я считаю, что вам нужно добавить все потоки сразу. Если очередь запущена пустым, combinedStream автоматически завершается. См. вопрос № 5.

Библиотека stream-stream является альтернативой с явным .end, но она гораздо менее популярна и, по-видимому, не так хорошо проверена, Он использует API streams2 Node 0.10 (см. это обсуждение).

Ответ 2

это можно сделать с помощью vanilla nodejs

import { PassThrough } from 'stream'
const merge = (...streams) => {
    let pass = new PassThrough()
    let waiting = streams.length
    for (let stream of streams) {
        pass = stream.pipe(pass, {end: false})
        stream.once('end', () => --waiting === 0 && pass.emit('end'))
    }
    return pass
}

Ответ 3

Простая операция сокращения должна быть в порядке в nodejs !

const {PassThrough} = require('stream')

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
  s.pipe(pt, {end: false})
  s.once('end', () => a.every(s => s.ended) && pt.emit('end'))
  return pt
}, new PassThrough())

Ура;)

Ответ 4

Возможно, вы сможете сделать его более кратким, но здесь тот, который работает:

var util = require('util');
var EventEmitter = require('events').EventEmitter;

function ConcatStream(streamStream) {
  EventEmitter.call(this);
  var isStreaming = false,
    streamsEnded = false,
    that = this;

  var streams = [];
  streamStream.on('stream', function(stream){
    stream.pause();
    streams.push(stream);
    ensureState();
  });

  streamStream.on('end', function() {
    streamsEnded = true;
    ensureState();
  });

  var ensureState = function() {
    if(isStreaming) return;
    if(streams.length == 0) {
      if(streamsEnded)
        that.emit('end');
      return;
    }
    isStreaming = true;
    streams[0].on('data', onData);
    streams[0].on('end', onEnd);
    streams[0].resume();
  };

  var onData = function(data) {
    that.emit('data', data);
  };

  var onEnd = function() {
    isStreaming = false;
    streams[0].removeAllListeners('data');
    streams[0].removeAllListeners('end');
    streams.shift();
    ensureState();
  };
}

util.inherits(ConcatStream, EventEmitter);

Мы отслеживаем состояние с streams (очередь потоков, push назад и shift спереди), isStreaming и streamsEnded. Когда мы получаем новый поток, мы его нажимаем, и когда поток заканчивается, мы перестаем слушать и сдвигать его. Когда поток потоков заканчивается, мы устанавливаем streamsEnded.

В каждом из этих событий мы проверяем состояние, в котором мы находимся. Если мы уже потоки (поточный поток), мы ничего не делаем. Если очередь пуста и streamsEnded установлена, мы выделяем событие end. Если в очереди есть что-то, мы возобновляем его и слушаем его события.

* Обратите внимание, что pause и resume являются рекомендательными, поэтому некоторые потоки могут вести себя некорректно и требуют буферизации. Это упражнение предоставляется читателю.

Сделав все это, я сделал бы дело n=2, построив EventEmitter, создав с ним ConcatStream и испустив два события stream, за которыми следует событие end. Я уверен, что это можно сделать более кратко, но мы можем использовать то, что у нас есть.

Ответ 5

https://github.com/joepie91/node-combined-stream2 - это замена, совместимая с Streams2 для модуля комбинированного потока (который описан выше). Он автоматически обертывает Streams1.

Пример кода для комбинированного потока2:

var CombinedStream = require('combined-stream2');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));

Ответ 6

streamee.js - это набор трансформаторов и композиторов на основе потоков node 1.0+ и включает метод concatenate:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);

Ответ 7

В ванильных нодах, использующих ECMA 15+ и сочетающих хорошие ответы Иво и Фенга.

Класс PassThrough является тривиальным потоком Transform который никак не изменяет поток.

const { PassThrough } = require('stream');

const concatStreams = (streamArray, streamCounter = streamArray.length) => streamArray
  .reduce((mergedStream, stream) => {
    // pipe each stream of the array into the merged stream
    // prevent the automated 'end' event from firing
    mergedStream = stream.pipe(mergedStream, { end: false });
    // rewrite the 'end' event handler
    // Every time one of the stream ends, the counter is decremented.
    // Once the counter reaches 0, the mergedstream can emit its 'end' event.
    stream.once('end', () => --streamCounter === 0 && mergedStream.emit('end'));
    return mergedStream;
  }, new PassThrough());

Можно использовать так:

const mergedStreams = concatStreams([stream1, stream2, stream3]);