Подтвердить что ты не робот

Как использовать курсор .forEach() в MongoDB с помощью Node.js?

У меня есть огромная коллекция документов в моей БД, и мне интересно, как я могу просматривать все документы и обновлять их, причем каждый документ имеет другое значение.

4b9b3361

Ответ 1

Ответ зависит от используемого драйвера. Все драйверы MongoDB, которые, как я знаю, имеют cursor.forEach() так или иначе.

Вот несколько примеров:

node-mongodb-native

collection.find(query).forEach(function(doc) {
  // handle
}, function(err) {
  // done or error
});

mongojs

db.collection.find(query).forEach(function(err, doc) {
  // handle
});

monk

collection.find(query, { stream: true })
  .each(function(doc){
    // handle doc
  })
  .error(function(err){
    // handle error
  })
  .success(function(){
    // final callback
  });

mongoose

collection.find(query).stream()
  .on('data', function(doc){
    // handle doc
  })
  .on('error', function(err){
    // handle error
  })
  .on('end', function(){
    // final callback
  });

Обновление документов внутри обратного вызова .forEach

Единственная проблема с обновлением документов внутри обратного вызова .forEach заключается в том, что вы не знаете, когда все документы обновляются.

Чтобы решить эту проблему, вы должны использовать некоторое решение для асинхронного управления потоком. Вот несколько вариантов:

Вот пример использования async, используя функцию queue:

var q = async.queue(function (doc, callback) {
  // code for your update
  collection.update({
    _id: doc._id
  }, {
    $set: {hi: 'there'}
  }, {
    w: 1
  }, callback);
}, Infinity);

var cursor = collection.find(query);
cursor.each(function(err, doc) {
  if (err) throw err;
  if (doc) q.push(doc); // dispatching doc to async.queue
});

q.drain = function() {
  if (cursor.isClosed()) {
    console.log('all items have been processed');
    db.close();
  }
}

Ответ 2

Используя драйвер mongodb и современный NodeJS с async/await, хорошим решением будет использование next():

const collection = db.collection('things')
const cursor = collection.find({
  bla: 42 // find all things where bla is 42
});
let document;
while ((document = await cursor.next())) {
  await collection.findOneAndUpdate({
    _id: document._id
  }, {
    $set: {
      blu: 43
    }
  });
}

Это приводит к тому, что одновременно в памяти требуется только один документ, в отличие, например, от принятого ответа, когда многие документы засасываются в память до начала обработки документов. В случаях "огромных коллекций" (согласно вопросу) это может быть важно.

Если документы большие, это можно улучшить, используя проекцию, чтобы из базы данных выбирались только те поля документов, которые необходимы.

Ответ 3


var MongoClient = require('mongodb').MongoClient,
    assert = require('assert');

MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) {

    assert.equal(err, null);
    console.log("Successfully connected to MongoDB.");

    var query = {
        "category_code": "biotech"
    };

    db.collection('companies').find(query).toArray(function(err, docs) {

        assert.equal(err, null);
        assert.notEqual(docs.length, 0);

        docs.forEach(function(doc) {
            console.log(doc.name + " is a " + doc.category_code + " company.");
        });

        db.close();

    });

});

Обратите внимание, что вызов .toArray делает приложение для извлечения всего набора данных.


var MongoClient = require('mongodb').MongoClient,
    assert = require('assert');

MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) {

    assert.equal(err, null);
    console.log("Successfully connected to MongoDB.");

    var query = {
        "category_code": "biotech"
    };

    var cursor = db.collection('companies').find(query);

    function(doc) {
        cursor.forEach(
                console.log(doc.name + " is a " + doc.category_code + " company.");
            },
            function(err) {
                assert.equal(err, null);
                return db.close();
            }
    );
});

Обратите внимание, что курсор, возвращаемый функцией find(), назначен для var cursor. При таком подходе вместо извлечения всех данных в память и одновременного использования данных мы передаем данные в наше приложение. find() может сразу создать курсор, потому что он фактически не делает запрос к базе данных, пока мы не попытаемся использовать некоторые документы, которые он предоставит. cursor должен описать наш запрос. 2-й параметр к cursor.forEach показывает, что делать, если драйвер исчерпан или возникает ошибка.

В первоначальной версии приведенного выше кода именно toArray() вызывал вызов базы данных. Это означало, что нам нужны ВСЕ документы и мы хотим, чтобы они были в array.

Также MongoDB возвращает данные в пакетном формате. На рисунке ниже показаны запросы от курсоров (из приложения) к MongoDB

MongoDB cursor requests

forEach лучше, чем toArray потому что мы можем обрабатывать документы по мере их поступления до конца. Сравните это с toArray - где мы ожидаем получения ВСЕХ документов и построения всего массива. Это означает, что мы не получаем никаких преимуществ от того факта, что драйвер и система базы данных работают вместе для пакетной обработки результатов для вашего приложения. Пакетирование предназначено для обеспечения эффективности с точки зрения накладных расходов памяти и времени выполнения. Воспользуйтесь этим, если можете в своем приложении.

Ответ 4

Ответ Leonid замечателен, но я хочу укрепить важность использования async/ promises и дать другое решение с примером promises.

Простейшим решением этой проблемы является цикл для каждого документа и вызов обновления. Обычно вам не нужно закрывать соединение db после каждого запроса, но если вам нужно закрыть соединение, будьте осторожны. Вы должны просто закрыть его, если вы уверены, что завершены завершение обновлений all.

Общей ошибкой здесь является вызов db.close() после отправки всех обновлений, не зная, завершены ли они. Если вы это сделаете, вы получите ошибки.

Неверная реализация:

collection.find(query).each(function(err, doc) {
  if (err) throw err;

  if (doc) {
    collection.update(query, update, function(err, updated) {
      // handle
    });
  } 
  else {
    db.close(); // if there is any pending update, it will throw an error there
  }
});

Однако, поскольку db.close() также является асинхронной операцией (ее подпись имеет функцию обратного вызова), вам может быть повезло, и этот код может закончите без ошибок. Он может работать только тогда, когда вам нужно обновить несколько документов в небольшой коллекции (так что не пытайтесь).


Правильное решение:

Поскольку решение с асинксом уже было предложено Leonid, ниже следует решение, использующее Q promises.

var Q = require('q');
var client = require('mongodb').MongoClient;

var url = 'mongodb://localhost:27017/test';

client.connect(url, function(err, db) {
  if (err) throw err;

  var promises = [];
  var query = {}; // select all docs
  var collection = db.collection('demo');
  var cursor = collection.find(query);

  // read all docs
  cursor.each(function(err, doc) {
    if (err) throw err;

    if (doc) {

      // create a promise to update the doc
      var query = doc;
      var update = { $set: {hi: 'there'} };

      var promise = 
        Q.npost(collection, 'update', [query, update])
        .then(function(updated){ 
          console.log('Updated: ' + updated); 
        });

      promises.push(promise);
    } else {

      // close the connection after executing all promises
      Q.all(promises)
      .then(function() {
        if (cursor.isClosed()) {
          console.log('all items have been processed');
          db.close();
        }
      })
      .fail(console.error);
    }
  });
});

Ответ 5

Теперь node-mongodb-native поддерживает параметр endCallback для cursor.forEach, как для обработки события ПОСЛЕ всей итерации, обратитесь к официальному документу за подробностями http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#forEach.

Также обратите внимание, что . Каждый теперь устарел в собственном драйвере nodejs.

Ответ 6

И вот пример использования асинхронного курсора Mongoose с promises:

new Promise(function (resolve, reject) {
  collection.find(query).cursor()
    .on('data', function(doc) {
      // ...
    })
    .on('error', reject)
    .on('end', resolve);
})
.then(function () {
  // ...
});

Ссылка:

Ответ 7

Ни в одном из предыдущих ответов не упоминается о пакетных обновлениях. Это делает их чрезвычайно медленными - в десятки или сотни раз медленнее, чем решение, использующее bulkWrite.

Допустим, вы хотите удвоить значение поля в каждом документе. Вот как это сделать быстро 💨 и с фиксированным потреблением памяти:

// Double the value of the 'foo' field in all documents
let bulkWrites = [];
const bulkDocumentsSize = 100;  // how many documents to write at once
let i = 0;
db.collection.find({ ... }).forEach(doc => {
  i++;

  // Update the document...
  doc.foo = doc.foo * 2;

  // Add the update to an array of bulk operations to execute later
  bulkWrites.push({
    replaceOne: {
      filter: { _id: doc._id },
      replacement: doc,
    },
  });

  // Update the documents and log progress every 'bulkDocumentsSize' documents
  if (i % bulkDocumentsSize === 0) {
    db.collection.bulkWrite(bulkWrites);
    bulkWrites = [];
    print('Updated ${i} documents');
  }
});
// Flush the last <100 bulk writes
db.collection.bulkWrite(bulkWrites);