Подтвердить что ты не робот

Как создать читаемый поток с источником данных async в NodeJs?

Окружающая среда: NodeJS, Express, DynamoDB (но может быть любая база данных действительно)

Сценарий: Необходимо прочитать большое количество записей и вернуться к пользователю в качестве загружаемого файла. Это означает, что я не могу сразу загрузить весь контент, а затем отправить его в ответ от Express. Кроме того, мне может потребоваться выполнить запрос несколько раз, поскольку все данные могут не возвращаться в одном запросе.

Предлагаемое решение: Используйте читаемый поток, который можно передать в поток ответов в Express.

Я начал с создания объекта, который наследует от stream.Readable и реализовал метод _read(), который подталкивает результаты запроса. Проблема заключается в том, что запрос базы данных, вызываемый в _read(), является асинхронным, но stream.read() является методом синхронизации.

Когда поток передается по ответу на сервер, чтение вызывается несколько раз, прежде чем запрос db даже получит возможность выполнить. Таким образом, запрос вызывается несколько раз, и даже когда первый экземпляр запроса заканчивается и выполняет push (null), остальные запросы завершаются, и я получаю ошибку "push() после EOF".

  • Есть ли способ сделать это правильно с помощью _read()?
  • Должен ли я забыть о _read() и просто выполнить запрос, а push() приводит к конструктору?
  • Должен ли я выполнять запрос и испускать события данных вместо push()?

Спасибо

function DynamoDbResultStream(query, options){
    if(!(this instanceof DynamoDbResultStream)){
        return new DynamoDbResultStream(query, options);
    }

    Readable.call(this, options);

    this.dbQuery = query;
    this.done = false;
}
util.inherits(DynamoDbResultStream, Readable);

DynamoDbResultStream.prototype._read = function(){
    var self = this;
    if(!this.done){
        dynamoDB.query(this.dbQuery, function(err, data) {
            if (!err) {
                try{
                    for(i=0;i<data.Items.length;i++){
                        self.push(data.Items[i]);
                    }
                }catch(err){
                    console.log(err);
                }
                if (data.LastEvaluatedKey) {
                    //Next read() should invoke the query with a new start key
                    self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey;
                }else{
                    self.done=true;
                    self.push(null);
                }
            }else{
                 console.log(err);
                 self.emit('error',err);
            }
        });
    }else{
        self.push(null);
    }
};

EDIT: После публикации этого вопроса я нашел этот пост с ответом, который показывает, как это сделать, не используя наследование: Как вызвать асинхронную функцию внутри читаемого потока node.js

Был сделан комментарий, что внутри _read() должно быть только одно нажатие(). И каждый push() обычно генерирует еще один вызов read().

4b9b3361

Ответ 1

Помните о различных режимах потока: https://nodejs.org/api/stream.html#stream_two_modes

const Readable = require('stream').Readable;

// starts in paused mode
const readable = new Readable();

let i = 0;
fetchMyAsyncData() {
  setTimeout(() => {
    // still remains in paused mode
    readable.push(++i);

    if (i === 5) {
      return readable.emit('end');
    }

    fetchMyAsyncData();
  }, 500);    
}

// "The res object is an enhanced version of Node’s own response object and supports all built-in fields and methods."
app.get('/mystreamingresponse' (req, res) => {

  // remains in paused mode
  readable.on('readable', () => res.write(readable.read()));

  fetchMyAsyncData();

  // closes the response stream once all external data arrived
  readable.on('end', () => res.end());
})