Подтвердить что ты не робот

Реализация потока буферизованного преобразования

Я пытаюсь реализовать поток с новым Node.js потоком API, который будет буферизовать определенный объем данных. Когда этот поток передается по каналу в другой поток или если что-то потребляет события readable, этот поток должен очищать свой буфер, а затем просто переходить через проход. Ловушка состоит в том, что этот поток будет передан по каналам во многие другие потоки, и когда каждый целевой поток будет подключен, буфер должен быть сброшен, даже если он уже сброшен в другой поток.

Например:

  • BufferStream реализует stream.Transform и сохраняет внутренний кольцевой буфер 512 КБ.
  • ReadableStreamA передается по каналу в экземпляр BufferStream
  • BufferStream записывает в свой кольцевой буфер, считывая данные из ReadableStreamA по мере его поступления. (Не имеет значения, потеряны ли данные, поскольку буфер перезаписывает старые данные.)
  • BufferStream подается на WritableStreamB
  • WritableStreamB получает весь буфер 512 Кбайт и продолжает получать данные, поскольку он написан от ReadableStreamA до BufferStream.
  • BufferStream подается на WritableStreamC
  • WritableStreamC также получает весь буфер 512 Кбайт, но этот буфер теперь отличается от того, что получил WritableStreamB, поскольку с тех пор больше данных было записано в BufferStream.

Возможно ли это с помощью API потоков? Единственный метод, о котором я могу думать, - создать объект с помощью метода, который запустит новый поток PassThrough для каждого пункта назначения, то есть я не мог бы просто подключиться к нему и из него.

Для чего это стоит, я сделал это со старым "текущим" API, просто слушая новые обработчики событий data. Когда новая функция была присоединена с помощью .on('data'), я бы назвал ее напрямую с копией кольцевого буфера.

4b9b3361

Ответ 1

Вот мой вопрос по вашей проблеме.

Основная идея - создать поток Transform, который позволит нам выполнить вашу собственную логику буферизации перед отправкой данных на выходе потока:

var util = require('util')
var stream = require('stream')

var BufferStream = function (streamOptions) {
  stream.Transform.call(this, streamOptions)
  this.buffer = new Buffer('')
}

util.inherits(BufferStream, stream.Transform)

BufferStream.prototype._transform = function (chunk, encoding, done) {
  // custom buffering logic
  // ie. add chunk to this.buffer, check buffer size, etc.
  this.buffer = new Buffer(chunk)

  this.push(chunk)
  done()
}

Затем нам нужно переопределить метод .pipe(), чтобы мы были уведомлены, когда BufferStream передается в поток, что позволяет нам автоматически записывать в него данные:

BufferStream.prototype.pipe = function (destination, options) {
  var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
  res.write(this.buffer)
  return res
}

Таким образом, когда мы пишем buffer.pipe(someStream), мы выполняем канал по назначению и записываем внутренний буфер в выходной поток. После этого класс Transform заботится обо всем, отслеживая противодавление и многое другое.

Вот рабочий принцип. Обратите внимание, что я не стал писать правильную логику буферизации (т.е. Мне не нужен размер внутреннего буфера), но это должно быть легко исправить.

Ответ 2

Пол отвечает хорошо, но я не думаю, что он соответствует конкретным требованиям. Похоже, что должно произойти то, что каждый раз, когда pipe() вызывается в этом потоке преобразования, ему нужно сначала сбросить буфер, который представляет все накопление данных между временем, когда был создан поток преобразования/(подключен к потоку источника) и время, когда оно было подключено к текущему потоку записи/назначения.

Что-то вроде этого может быть более правильным:

  var BufferStream = function () {
        stream.Transform.apply(this, arguments);
        this.buffer = []; //I guess an array will do
    };

    util.inherits(BufferStream, stream.Transform);

    BufferStream.prototype._transform = function (chunk, encoding, done) {

        this.push(chunk ? String(chunk) : null);
        this.buffer.push(chunk ? String(chunk) : null);

        done()
    };

    BufferStream.prototype.pipe = function (destination, options) {
        var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
        this.buffer.forEach(function (b) {
            res.write(String(b));
        });
        return res;
    };


    return new BufferStream();

Я полагаю, что это:

BufferStream.super_.prototype.pipe.apply(this, arguments);

эквивалентно этому:

stream.Transform.prototype.pipe.apply(this, arguments);

Вы могли бы оптимизировать это и использовать некоторые флаги при вызове pipe/unpipe.