Каков правильный способ обработки обратного давления в потоке преобразования node.js?


Это мои первые приключения в написании серверной части node.js. Это было до сих пор весело, но я с трудом понимаю правильный путь для реализации чего-то, связанного с потоками node.js.


В тестовых и учебных целях я работаю с большими файлами, чьи содержимое zlib сжато. Сжатый контент представляет собой двоичные данные, каждый пакет длиной 38 байт. Я пытаюсь создать результирующий файл который выглядит почти идентично исходному файлу, за исключением того, что есть несжатый 31-байтовый заголовок для каждых 1024 38-байтовых пакетов.

исходное содержимое файла (распаковано)

| packet 1 | packet 2 |  ......  | packet N |
| 38 bytes | 38 bytes |  ......  | 38 bytes |

итоговое содержимое файла

| header 1 |    1024 38 byte packets        | header 2 |    1024 38 byte packets        |
| 31 bytes |       zlib compressed          | 31 bytes |       zlib compressed          |

Как вы можете видеть, это проблема перевода. Смысл, я взяв некоторый поток источника в качестве входных данных, а затем слегка преобразуя его в некоторый выходной поток. Поэтому было естественным Преобразовать поток.

Класс просто пытается выполнить следующее:

  • Принимает поток в качестве ввода
  • zlib раздувает куски данных для подсчета количества пакетов,  собрав 1024 из них, zlib deflating, и  добавление заголовка.
  • Пропускает новый результирующий фрагмент через конвейер через   this.push(chunk).

Пример использования:

var fs = require('fs');
var me = require('./me'); // Where my Transform stream code sits
var inp = fs.createReadStream('depth_1000000');
var out = fs.createWriteStream('depth_1000000.out');

Вопрос (ы)

Предполагая, что Трансформация является хорошим выбором для этого варианта использования, я, кажется, столкнувшись с возможной проблемой противодавления. Мой вызов this.push(chunk) внутри _transform продолжает возвращать false. Почему это было бы и как обрабатывать такие вещи?


Ответ 1

Я думаю, что Transform подходит для этого, но я буду выполнять надувание как отдельный шаг в конвейере.

Вот быстрый и в значительной степени непроверенный пример:

var zlib        = require('zlib');
var stream      = require('stream');
var transformer = new stream.Transform();

// Properties used to keep internal state of transformer.
transformer._buffers    = [];
transformer._inputSize  = 0;
transformer._targetSize = 1024 * 38;

// Dump one 'output packet'
transformer._dump       = function(done) {
  // concatenate buffers and convert to binary string
  var buffer = Buffer.concat(this._buffers).toString('binary');

  // Take first 1024 packets.
  var packetBuffer = buffer.substring(0, this._targetSize);

  // Keep the rest and reset counter.
  this._buffers   = [ new Buffer(buffer.substring(this._targetSize)) ];
  this._inputSize = this._buffers[0].length;

  // output header
  this.push('HELLO WORLD');

  // output compressed packet buffer
  zlib.deflate(packetBuffer, function(err, compressed) {
    // TODO: handle `err`
    if (done) {

// Main transformer logic: buffer chunks and dump them once the
// target size has been met.
transformer._transform  = function(chunk, encoding, done) {
  this._inputSize += chunk.length;

  if (this._inputSize >= this._targetSize) {
  } else {

// Flush any remaining buffers.
transformer._flush = function() {

// Example:
var fs = require('fs');

Ответ 2

push вернет false, если поток, который вы пишете (в данном случае, поток вывода файла) имеет слишком много буферизированных данных. Поскольку вы пишете на диск, это имеет смысл: вы обрабатываете данные быстрее, чем можете их записать.

Когда буфер out заполнен, ваш поток преобразования не сможет нажать и начать буферизацию данных. Если этот буфер должен заполнить, то inp начнет заполняться. Вот как все должно работать. Потоки с потоками только собираются обрабатывать данные так быстро, как может обрабатывать самое медленное звено в цепочке (как только ваши буферы заполнены).

Ответ 3

Этот вопрос с 2013 года - это все, что я смог найти о том, как бороться с "противодавлением", при создании node Transform streams.

Из node 7.10.0 Преобразовать поток и Считываемый поток документация, что я собрал заключалось в том, что когда push возвращается false, ничего больше не нужно нажимать до тех пор, пока _read не будет называется.

В документации Transform не упоминается _read, за исключением того, что базовое преобразование класс реализует его (и _write). Я нашел информацию о push, возвращающем false и _read вызывается в Документация с читаемым потоком.

Единственный другой авторитетный комментарий, который я нашел на обратном давлении трансформации, упоминается только это как проблема, и это было в комментарии в верхней части node файла _stream_transform.js.

Здесь раздел о противодавлении этого комментария:

// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk.  However,
// a pathological inflate type of transform can cause excessive buffering
// here.  For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output.  Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output.  In this case, you could write a very small
// amount of input, and end up with a very large amount of output.  In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform.  A single 4MB write could
// cause the system to run out of memory.
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.

Пример решения

Здесь решение, которое я собрал вместе для обработки противодавления в потоке преобразования который я уверен, работает. (Я не написал никаких реальных тестов, которые потребуют записывая записываемый поток для контроля обратного давления.)

Это рудиментарное преобразование линии, которое нуждается в работе как преобразование строки, но делает демонстрируют обратное давление ".

const stream = require('stream');

class LineTransform extends stream.Transform

        this._lastLine = "";
        this._continueTransform = null;
        this._transforming = false;
        this._debugTransformCallCount = 0;

    _transform(chunk, encoding, callback)
        if (encoding === "buffer")
            return callback(new Error("Buffer chunks not supported"));

        if (this._continueTransform !== null)
            return callback(new Error("_transform called before previous transform has completed."));

        // DEBUG: Uncomment for debugging help to see what going on
        //console.error(`${++this._debugTransformCallCount} _transform called:`);

        // Guard (so we don't call _continueTransform from _read while it is being
        // invoked from _transform)
        this._transforming = true;

        // Do our transforming (in this case splitting the big chunk into lines)
        let lines = (this._lastLine + chunk).split(/\r\n|\n/);
        this._lastLine = lines.pop();

        // In order to respond to "back pressure" create a function
        // that will push all of the lines stopping when push returns false,
        // and then resume where it left off when called again, only calling
        // the "callback" once all lines from this transform have been pushed.
        // Resuming (until done) will be done by _read().
        let nextLine = 0;
        this._continueTransform = () =>
                let backpressure = false;
                while (nextLine < lines.length)

                    if (!this.push(lines[nextLine++] + "\n"))
                        // we've got more to push, but we got backpressure so it has to wait.
                        if (backpressure)

                        backpressure = !this.push(lines[nextLine++] + "\n");

                // DEBUG: Uncomment for debugging help to see what going on
                //console.error(`_continueTransform ${this._debugTransformCallCount} finished\n`);

                // All lines are pushed, remove this function from the LineTransform instance
                this._continueTransform = null;
                return callback();

        // Start pushing the lines

        // Turn off guard allowing _read to continue the transform pushes if needed.
        this._transforming = false;

        if (this._lastLine.length > 0)
            this._lastLine = "";

        return callback();

        // DEBUG: Uncomment for debugging help to see what going on
        //if (this._transforming)
        //    console.error(`_read called during _transform ${this._debugTransformCallCount}`);

        // If a transform has not pushed every line yet, continue that transform
        // otherwise just let the base class implementation do its thing.
        if (!this._transforming && this._continueTransform !== null)

Я протестировал выше, запустив его с линиями DEBUG, раскомментированными на линии 10000 ~ 200 КБ файл. Перенаправить stdout или stderr в файл (или оба) для разделения отладки выписки из ожидаемого результата. (node test.js > out.log 2> err.log)

const fs = require('fs');
let inStrm = fs.createReadStream("testdata/largefile.txt", { encoding: "utf8" });
let lineStrm = new LineTransform({ encoding: "utf8", decodeStrings: false });

Полезная подсказка для отладки

При написании этого изначально я не понимал, что _read можно назвать до _transform вернулся, поэтому я не реализовал защиту this._transforming, и я был получив следующую ошибку:

Error: no writecb in Transform class
    at afterTransform (_stream_transform.js:71:33)
    at TransformState.afterTransform (_stream_transform.js:54:12)
    at LineTransform._continueTransform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:44:13)
    at LineTransform._transform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:46:21)
    at LineTransform.Transform._read (_stream_transform.js:167:10)
    at LineTransform._read (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:56:15)
    at LineTransform.Transform._write (_stream_transform.js:155:12)
    at doWrite (_stream_writable.js:331:12)
    at writeOrBuffer (_stream_writable.js:317:5)
    at LineTransform.Writable.write (_stream_writable.js:243:11)

Глядя на реализацию node, я понял, что эта ошибка означает, что обратный вызов данный _transform вызывается более одного раза. Не так много информации чтобы найти об этой ошибке, так что я думал, что включу то, что я понял здесь.

Ответ 4

В последнее время возникла аналогичная проблема, требующая обработки противодавления в раздувающем потоке преобразования - секрет обработки push(), возвращающей false, заключается в регистрации и обработке события 'drain' в потоке

_transform(data, enc, callback) {
  const continueTransforming = () => {
    ... do some work / parse the data, keep state of where we're at etc
         this._readableState.pipes.once('drain', continueTransforming); // will get called again when the reader can consume more data

ЗАМЕЧАНИЕ: это немного странно, так как мы обращаемся к внутренностям, и pipes может быть даже массивом Readable, но он работает в общем случае ....pipe(transform).pipe(...

Было бы здорово, если бы кто-то из сообщества Node мог предложить "правильный" метод для обработки .push(), возвращающего false

Ответ 5

В итоге я последовал примеру Ledion и создал служебный класс Transform, который помогает с противодавлением. Утилита добавляет асинхронный метод с именем addData, который может ожидать реализующий Transform.

'use strict';

const { Transform } = require('stream');

 * The BackPressureTransform class adds a utility method addData which
 * allows for pushing data to the Readable, while honoring back-pressure.
class BackPressureTransform extends Transform {
  constructor(...args) {

   * Asynchronously add a chunk of data to the output, honoring back-pressure.
   * @param {String} data
   * The chunk of data to add to the output.
   * @returns {Promise<void>}
   * A Promise resolving after the data has been added.
  async addData(data) {
    // if .push() returns false, it means that the readable buffer is full
    // when this occurs, we must wait for the internal readable to emit
    // the 'drain' event, signalling the readable is ready for more data
    if (!this.push(data)) {
      await new Promise((resolve, reject) => {
        const errorHandler = error => {
          this.emit('error', error);
        const boundErrorHandler = errorHandler.bind(this);

        this._readableState.pipes.on('error', boundErrorHandler);
        this._readableState.pipes.once('drain', () => {
          this._readableState.pipes.removeListener('error', boundErrorHandler);

module.exports = {

Используя этот служебный класс, мои Transforms теперь выглядят так:

'use strict';

const { BackPressureTransform } = require('./back-pressure-transform');

 * The Formatter class accepts the transformed row to be added to the output file.
 * The class provides generic support for formatting the result file.
class Formatter extends BackPressureTransform {
  constructor() {
      encoding: 'utf8',
      readableObjectMode: false,
      writableObjectMode: true

    this.anyObjectsWritten = false;

   * Called when the data pipeline is complete.
   * @param {Function} callback
   * The function which is called when final processing is complete.
   * @returns {Promise<void>}
   * A Promise resolving after the flush completes.
  async _flush(callback) {
    // if any object is added, close the surrounding array
    if (this.anyObjectsWritten) {
      await this.addData('\n]');


   * Given the transformed row from the ETL, format it to the desired layout.
   * @param {Object} sourceRow
   * The transformed row from the ETL.
   * @param {String} encoding
   * Ignored in object mode.
   * @param {Function} callback
   * The callback function which is called when the formatting is complete.
   * @returns {Promise<void>}
   * A Promise resolving after the row is transformed.
  async _transform(sourceRow, encoding, callback) {
    // before the first object is added, surround the data as an array
    // between each object, add a comma separator
    await this.addData(this.anyObjectsWritten ? ',\n' : '[\n');

    // update state
    this.anyObjectsWritten = true;

    // add the object to the output
    const parsed = JSON.stringify(sourceRow, null, 2).split('\n');
    for (const [index, row] of parsed.entries()) {
      // prepend the row with 2 additional spaces since we're inside a larger array
      await this.addData('  ${row}');

      // add line breaks except for the last row
      if (index < parsed.length - 1) {
        await this.addData('\n');


module.exports = {

Ответ 6

Думаю, Майк Липперт ответ - самый близкий к истине. Похоже, что ожидание нового вызова _read(), чтобы начать снова из потока чтения, является единственным способом, которым Transform активно уведомляется о том, что читатель готов. Я хотел бы поделиться простым примером того, как я временно переопределяю _read().

_transform(buf, enc, callback) {

  // prepend any unused data from the prior chunk.
  if (this.prev) {
    buf = Buffer.concat([ this.prev, buf ]);
    this.prev = null;

  // will keep transforming until buf runs low on data.
  if (buf.length < this.requiredData) {
    this.prev = buf;
    return callback();

  var result = // do something with data...
  var nextbuf = buf.slice(this.requiredData);

  if (this.push(result)) {
    // Continue transforming this chunk
    this._transform(nextbuf, enc, callback);
  else {
    // Node is warning us to slow down (applying "backpressure")
    // Temporarily override _read request to continue the transform
    this._read = function() {
        delete this._read;
        this._transform(nextbuf, enc, callback);