-
2 потока:
Учитывая читаемые потоки
stream1
иstream2
, какой идиоматический (сжатый) способ получить поток, содержащийstream1
иstream2
сцепляютсяЯ не могу сделать
stream1.pipe(outStream); stream2.pipe(outStream)
, потому что тогда содержимое потока смешано вместе. -
n потоков:
Учитывая EventEmitter, который испускает неопределенное количество потоков, например
eventEmitter.emit('stream', stream1) eventEmitter.emit('stream', stream2) eventEmitter.emit('stream', stream3) ... eventEmitter.emit('end')
какой идиоматический (сжатый) способ получить поток со всеми конкатенированными потоками?
Объединение двух (или n) потоков
Ответ 1
Пакет combined-stream объединяет потоки. Пример из README:
var CombinedStream = require('combined-stream');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
Я считаю, что вам нужно добавить все потоки сразу. Если очередь запущена пустым, combinedStream
автоматически завершается. См. вопрос № 5.
Библиотека stream-stream является альтернативой с явным .end
, но она гораздо менее популярна и, по-видимому, не так хорошо проверена, Он использует API streams2 Node 0.10 (см. это обсуждение).
Ответ 2
это можно сделать с помощью vanilla nodejs
import { PassThrough } from 'stream'
const merge = (...streams) => {
let pass = new PassThrough()
let waiting = streams.length
for (let stream of streams) {
pass = stream.pipe(pass, {end: false})
stream.once('end', () => --waiting === 0 && pass.emit('end'))
}
return pass
}
Ответ 3
Простая операция сокращения должна быть в порядке в nodejs !
const {PassThrough} = require('stream')
let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
s.pipe(pt, {end: false})
s.once('end', () => a.every(s => s.ended) && pt.emit('end'))
return pt
}, new PassThrough())
Ура;)
Ответ 4
Возможно, вы сможете сделать его более кратким, но здесь тот, который работает:
var util = require('util');
var EventEmitter = require('events').EventEmitter;
function ConcatStream(streamStream) {
EventEmitter.call(this);
var isStreaming = false,
streamsEnded = false,
that = this;
var streams = [];
streamStream.on('stream', function(stream){
stream.pause();
streams.push(stream);
ensureState();
});
streamStream.on('end', function() {
streamsEnded = true;
ensureState();
});
var ensureState = function() {
if(isStreaming) return;
if(streams.length == 0) {
if(streamsEnded)
that.emit('end');
return;
}
isStreaming = true;
streams[0].on('data', onData);
streams[0].on('end', onEnd);
streams[0].resume();
};
var onData = function(data) {
that.emit('data', data);
};
var onEnd = function() {
isStreaming = false;
streams[0].removeAllListeners('data');
streams[0].removeAllListeners('end');
streams.shift();
ensureState();
};
}
util.inherits(ConcatStream, EventEmitter);
Мы отслеживаем состояние с streams
(очередь потоков, push
назад и shift
спереди), isStreaming
и streamsEnded
. Когда мы получаем новый поток, мы его нажимаем, и когда поток заканчивается, мы перестаем слушать и сдвигать его. Когда поток потоков заканчивается, мы устанавливаем streamsEnded
.
В каждом из этих событий мы проверяем состояние, в котором мы находимся. Если мы уже потоки (поточный поток), мы ничего не делаем. Если очередь пуста и streamsEnded
установлена, мы выделяем событие end
. Если в очереди есть что-то, мы возобновляем его и слушаем его события.
* Обратите внимание, что pause
и resume
являются рекомендательными, поэтому некоторые потоки могут вести себя некорректно и требуют буферизации. Это упражнение предоставляется читателю.
Сделав все это, я сделал бы дело n=2
, построив EventEmitter
, создав с ним ConcatStream
и испустив два события stream
, за которыми следует событие end
. Я уверен, что это можно сделать более кратко, но мы можем использовать то, что у нас есть.
Ответ 5
https://github.com/joepie91/node-combined-stream2 - это замена, совместимая с Streams2 для модуля комбинированного потока (который описан выше). Он автоматически обертывает Streams1.
Пример кода для комбинированного потока2:
var CombinedStream = require('combined-stream2');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
Ответ 6
streamee.js - это набор трансформаторов и композиторов на основе потоков node 1.0+ и включает метод concatenate:
var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);
Ответ 7
В ванильных нодах, использующих ECMA 15+ и сочетающих хорошие ответы Иво и Фенга.
Класс PassThrough
является тривиальным потоком Transform
который никак не изменяет поток.
const { PassThrough } = require('stream');
const concatStreams = (streamArray, streamCounter = streamArray.length) => streamArray
.reduce((mergedStream, stream) => {
// pipe each stream of the array into the merged stream
// prevent the automated 'end' event from firing
mergedStream = stream.pipe(mergedStream, { end: false });
// rewrite the 'end' event handler
// Every time one of the stream ends, the counter is decremented.
// Once the counter reaches 0, the mergedstream can emit its 'end' event.
stream.once('end', () => --streamCounter === 0 && mergedStream.emit('end'));
return mergedStream;
}, new PassThrough());
Можно использовать так:
const mergedStreams = concatStreams([stream1, stream2, stream3]);