Подтвердить что ты не робот

Как сохранить вложения MongoDB во время отключения в node.js?

Мы читаем XML файл (используя xml-stream) с примерно 500k элементами и вставляем их в MongoDB следующим образом:

xml.on(`endElement: product`, writeDataToDb.bind(this, "product"));

Вставить в writeDataToDb(type, obj) выглядит следующим образом:

collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { });

Теперь, когда соединение Mongo отключилось, поток xml все еще читает, и консоль заливается сообщениями об ошибках (невозможно вставить, отключить, отключить EPIPE,...).

В docs говорится:

Когда вы завершаете процесс mongod, драйвер останавливает операции обработки и сохраняет их буферизацию из-за того, что bufferMaxEntries равен -1 по умолчанию, что означает буферизацию всех операций.

Что делает этот буфер действительно?

Мы замечаем, когда мы вставляем данные и закрываем сервер mongo, вещи буферизуются, затем мы возвращаем сервер mongo, собственный драйвер успешно восстанавливается и node возобновляет вставку данных, но буферизованные документы (во время mongo beeing offline ) не вставлены снова.

Поэтому я задаю этот буфер и его использование.

Цель:

Мы ищем лучший способ сохранить вставки в буфер до тех пор, пока mongo не вернется (в 15000 миллисекунд в соответствии с wtimeout), а затем вставьте буферизованные документы или используйте xml.pause(); и xml.resume(), которые мы пробовали без успех.

В основном нам нужна небольшая помощь в том, как обрабатывать разъединения без потери данных или прерываний.

4b9b3361

Ответ 1

Вставка 500K элементов с insertOne() - очень плохая идея. Вместо этого вы должны использовать массовые операции, который позволяет вставлять много документов в один запрос. (здесь, например, 10000, так что это можно сделать в 50 отдельных запросах) Чтобы избежать проблемы с буферизацией, вы можете вручную ее обработать:

  • Отключить буферизацию с помощью bufferMaxEntries: 0
  • Установить свойства повторного подключения: reconnectTries: 30, reconnectInterval: 1000
  • Создайте bulkOperation и отправьте его 10000 элементам.
  • Приостановить xml-ридер. Попробуйте вставить 10000 предметов. Если это не удается, повторите попытку каждые 3000 мс до тех пор, пока он не добьется успеха.
  • Вы можете столкнуться с некоторыми повторяющимися проблемами с идентификатором, если массовая операция прервана во время выполнения, поэтому игнорируйте их (код ошибки: 11000)

вот пример script:

var fs = require('fs')
var Xml = require('xml-stream')

var MongoClient = require('mongodb').MongoClient
var url = 'mongodb://localhost:27017/test'

MongoClient.connect(url, {
  reconnectTries: 30,
  reconnectInterval: 1000,
  bufferMaxEntries: 0
}, function (err, db) {
  if (err != null) {
    console.log('connect error: ' + err)
  } else {
    var collection = db.collection('product')
    var bulk = collection.initializeUnorderedBulkOp()
    var totalSize = 500001
    var size = 0

    var fileStream = fs.createReadStream('data.xml')
    var xml = new Xml(fileStream)
    xml.on('endElement: product', function (product) {
      bulk.insert(product)
      size++
      // if we have enough product, save them using bulk insert
      if (size % 10000 == 0) {
        xml.pause()
        bulk.execute(function (err, result) {
          if (err == null) {
            bulk = collection.initializeUnorderedBulkOp()
            console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try')
            xml.resume()
          } else {
            console.log('bulk insert failed: ' + err)
            counter = 0
            var retryInsert = setInterval(function () {
              counter++
              bulk.execute(function (err, result) {
                if (err == null) {
                  clearInterval(retryInsert)
                  bulk = collection.initializeUnorderedBulkOp()
                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
                  xml.resume()
                } else if (err.code === 11000) { // ignore duplicate ID error
                  clearInterval(retryInsert)
                  bulk = collection.initializeUnorderedBulkOp()
                  console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
                  xml.resume()
                } else {
                  console.log('failed after first try: ' + counter, 'error: ' + err)
                }
              })
            }, 3000) // retry every 3000ms until success
          }
        })
      } else if (size === totalSize) {
        bulk.execute(function (err, result) {
          if (err == null) {
            db.close()
          } else {
            console.log('bulk insert failed: ' + err)
          }
        })
      }
    })
  }
})

вывод журнала:

doc 0 : 10000 saved on first try
doc 10000 : 20000 saved on first try
doc 20000 : 30000 saved on first try
[...]
bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown
failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0
doc 130000 : 140000 saved after 4 tries
doc 140000 : 150000 saved on first try
[...]

Ответ 2

Я не знаю конкретно о драйвере Mongodb и этом буфере записей. Возможно, он хранит данные только в определенных сценариях.

Поэтому я отвечу на этот вопрос более общим подходом, который может работать с любой базой данных.

Подводя итог, у вас есть две проблемы:

  • Вы не восстанавливаете неудавшиеся попытки.
  • Слишком быстрый поток данных XML

Чтобы справиться с первой проблемой, вам нужно реализовать алгоритм повтора, который обеспечит выполнение многих попыток до отказа.

Чтобы справиться со второй проблемой, вам нужно реализовать обратное давление на поток xml. Вы можете сделать это с помощью метода pause, метода resume и входного буфера.

var Promise = require('bluebird');
var fs = require('fs');
var Xml = require('xml-stream');

var fileStream = fs.createReadStream('myFile.xml');
var xml = new Xml(fileStream);

// simple exponential retry algorithm based on promises
function exponentialRetry(task, initialDelay, maxDelay, maxRetry) {
    var delay = initialDelay;
    var retry = 0;
    var closure = function() {
        return task().catch(function(error) {
            retry++;
            if (retry > maxRetry) {
                throw error
            }
            var promise = Promise.delay(delay).then(closure);
            delay = Math.min(delay * 2, maxDelay);
            return promise;
        })
    };
    return closure();
}

var maxPressure = 100;
var currentPressure = 0;
var suspended = false;
var stopped = false;
var buffer = [];

// handle back pressure by storing incoming tasks in the buffer
// pause the xml stream as soon as we have enough tasks to work on
// resume it when the buffer is empty
function writeXmlDataWithBackPressure(product) {
    // closure used to try to start a task
    var tryStartTask = function() {
        // if we have enough tasks running, pause the xml stream
        if (!stopped && !suspended && currentPressure >= maxPressure) {
            xml.pause();
            suspended = true;
            console.log("stream paused");
        }
        // if we have room to run tasks
        if (currentPressure < maxPressure) {
            // if we have a buffered task, start it
            // if not, resume the xml stream
            if (buffer.length > 0) {
                buffer.shift()();
            } else if (!stopped) {
                try {
                    xml.resume();
                    suspended = false;
                    console.log("stream resumed");
                } catch (e) {
                    // the only way to know if you've reached the end of the stream
                    // xml.on('end') can be triggered BEFORE all handlers are called
                    // probably a bug of xml-stream
                    stopped = true;
                    console.log("stream end");
                }
            }
        }
    };

    // push the task to the buffer
    buffer.push(function() {
        currentPressure++;
        // use exponential retry to ensure we will try this operation 100 times before giving up
        exponentialRetry(function() {
            return writeDataToDb(product)
        }, 100, 2000, 100).finally(function() {
            currentPressure--;
            // a task has just finished, let try to run a new one
            tryStartTask();
        });
    });

    // we've just buffered a task, let try to run it
    tryStartTask();
}

// write the product to database here :)
function writeDataToDb(product) {
    // the following code is here to create random delays and random failures (just for testing)
    var timeToWrite = Math.random() * 100;
    var failure = Math.random() > 0.5;
    return Promise.delay(timeToWrite).then(function() {
        if (failure) {
            throw new Error();
        }
        return null;
    })
}

xml.on('endElement: product', writeXmlDataWithBackPressure);

Поиграйте с ним, поместите немного console.log, чтобы понять, как он себя ведет. Надеюсь, это поможет вам решить вашу проблему:)