Подтвердить что ты не робот

Как записать живой набор данных на диск с асинхронным вводом-выводом?

Я новичок в разработке в node.js(хотя и относительно опытный на клиентском javascript), и у меня много вопросов о хороших практиках при работе с асинхронными операциями в node.js.

Моя конкретная проблема (хотя я думаю, что это довольно общая тема) заключается в том, что у меня есть приложение node.js(работает на малине Pi), которое записывает показания от нескольких датчиков температуры каждые 10 секунд до в структуре данных памяти. Это прекрасно работает. Данные накапливаются с течением времени в памяти и, поскольку они накапливаются и достигают определенного порога размера, данные регулярно стареют (сохраняя только последние N дней данных), чтобы он не превышал определенный размер. Эти данные температуры используются для управления некоторыми другими устройствами.

Затем у меня есть отдельный таймер с интервалом, который так часто записывает эти данные на диск (чтобы сохранить его, если процесс выходит из строя). Я использую async node.js(fs.open(), fs.write() и fs.close()) disk IO для записи данных на диск.

И, из-за асинхронного характера дискового ввода-вывода, мне приходит в голову, что сама структура данных, которую я пытаюсь записать на диск, может быть модифицирована прямо посреди меня, записывая ее на диск. Возможно, это будет плохо. Если данные добавляются только к структуре данных при записи на диск, это фактически не вызовет проблемы с тем, как я пишу данные, но есть некоторые обстоятельства, когда более ранние данные могут быть изменены при записи новых данных и это действительно будет бесполезно с целостностью того, что я нахожусь в середине записи на диск.

Я могу придумать всевозможные несколько уродливых гарантий, которые я мог бы добавить в свой код, например:

  • Переключитесь на синхронный ввод-вывод, чтобы записать данные на диск (на самом деле не хотите делать это для ответа на сервер).
  • Установите флаг, когда я начал записывать данные и не записывал никаких новых данных, пока этот флаг установлен (заставляет меня потерять запись данных во время записи).
  • Более сложные версии опции 2, где я устанавливаю флаг, и когда установлен флаг, новые данные поступают в отдельную временную структуру данных, которая, когда файл IO выполняется, затем объединяется с реальными данными (выполнимо, но кажется некрасиво).
  • Сделайте копию моментальных копий исходных данных и не спешите писать эту копию на диск, зная, что никто не будет изменять копию. Я не хочу этого делать, потому что набор данных относительно велик, и я в ограниченной среде памяти (малиновый PI).

Итак, мой вопрос заключается в том, какие шаблоны проектирования для записи большого набора данных с асинхронным IO, когда другие операции могут захотеть изменить эти данные во время асинхронного ввода-вывода? Существуют ли более общие способы решения моей проблемы, чем конкретные описанные выше работы?

4b9b3361

Ответ 1

Ваша проблема синхронизация данных. Традиционно это решается с помощью locks/mutexes, но javascript/ node на самом деле не имеет ничего подобного встроенному.

Итак, как мы решаем это в node? Мы используем очереди. Лично я использую функцию queue из асинхронный модуль.

Очереди работают, сохраняя список задач, которые необходимо выполнить, и выполняйте эти задачи только в том порядке, в котором они добавлены в очередь, после завершения предыдущей задачи (аналогично вашему варианту 3).

queue animation

Примечание. Метод очереди асинхронного модуля может фактически запускать несколько задач одновременно (например, анимация выше показывает), но, поскольку мы говорим о синхронизации данных здесь, мы этого не хотим. К счастью, мы можем сказать, чтобы он просто запускал по одному.

В вашей конкретной ситуации то, что вы хотите сделать, это настроить очередь, которая может выполнять два типа задач:

  • Изменить структуру данных
  • Напишите вашу структуру данных на диск

Всякий раз, когда вы получаете новые данные от ваших температурных зондов, добавьте задачу в свою очередь, чтобы изменить структуру данных с помощью этих новых данных. Затем, когда срабатывает ваш интервальный таймер, добавьте задачу в свою очередь, которая записывает вашу структуру данных на диск.

Поскольку очередь будет запускать только одну задачу за раз, в том порядке, в котором они добавлены в очередь, она гарантирует, что вы никогда не будете изменять свою структуру данных в памяти во время записи данных на диск.

Очень простая реализация может выглядеть так:

var dataQueue = async.queue(function(task, callback) {
    if (task.type === "newData") {
        memoryStore.add(task.data); // modify your data structure however you do it now
        callback(); // let the queue know the task is done; you can pass an error here as usual if needed
    } else if (task.type === "writeData") {
        fs.writeFile(task.filename, JSON.stringify(memoryStore), function(err) {
            // error handling
            callback(err); // let the queue know the task is done
        })
    } else {
        callback(new Error("Unknown Task")); // just in case we get a task we don't know about
    }
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

// call when you get new probe data
funcion addNewData(data) {
    dataQueue.push({task: "newData", data: data}, function(err) {
        // called when the task is complete; optional
    });
}

// write to disk every 5 minutes
setInterval(function() {
    dataQueue.push({task: "writeData", filename: "somefile.dat"}, function(err) {
        // called when the task is complete; optional
    });
}, 18000);

Также обратите внимание, что теперь вы можете добавлять данные в свою структуру данных асинхронно. Скажем, вы добавляете новый зонд, который запускает событие, когда изменяется его значение. Вы можете просто addNewData(data), как и в случае с существующими пробниками, и не беспокоиться о том, что это противоречит изменениям в процессе или записи на диск (это действительно происходит, если вы начинаете писать в базу данных, а не в хранилище данных в памяти).


Обновление: Более элегантная реализация с использованием bind()

Идея состоит в том, что вы используете bind() для привязки аргументов к функции, а затем нажмите новую связанную функцию, которая возвращает bind() в очередь. Таким образом, вам не нужно вставлять какой-либо пользовательский объект в очередь, которую он должен интерпретировать; вы можете просто дать ему функцию для вызова, все настройки с правильными аргументами уже есть. Единственное предостережение в том, что функция должна принимать обратный вызов в качестве последнего аргумента.

Это должно позволить вам использовать все существующие функции (возможно, с небольшими изменениями) и просто вставлять их в очередь, когда вам нужно убедиться, что они не запускаются одновременно.

Я собрал это вместе, чтобы проверить концепцию:

var async = require('async');

var dataQueue = async.queue(function(task, callback) {
    // task is just a function that takes a callback; call it
    task(callback); 
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

function storeData(data, callback) {
    setTimeout(function() { // simulate async op
        console.log('store', data);
        callback(); // let the queue know the task is done
    }, 50);
}

function writeToDisk(filename, callback) {
    setTimeout(function() { // simulate async op
        console.log('write', filename);
        callback(); // let the queue know the task is done
    }, 250);
}

// store data every second
setInterval(function() {
    var data = {date: Date.now()}
    var boundStoreData = storeData.bind(null, data);
    dataQueue.push(boundStoreData, function(err) {
        console.log('store complete', data.date);
    })
}, 1000)

// write to disk every 2 seconds
setInterval(function() {
    var filename = Date.now() + ".dat"
    var boundWriteToDisk = writeToDisk.bind(null, filename);
    dataQueue.push(boundWriteToDisk, function(err) {
        console.log('write complete', filename);
    });
}, 2000);

Ответ 2

Сначала - покажите практическое решение, а затем дайте понять, как и почему он работает:

var chain = Promise.resolve(); // Create a resolved promise
var fs = Promise.promisifyAll(require("fs"));

chain = chain.then(function(){
    return fs.writeAsync(...); // A
});

// some time in the future
chain = chain.then(function(){
    return fs.writeAsync(...); // This will always execute after A is done
})

Поскольку вы отметили свой вопрос с помощью promises - стоит упомянуть, что promises решить эту (довольно сложную) проблему очень хорошо самостоятельно и сделать это довольно легко.

Проблема синхронизации данных называется проблемой производителя. Есть много способов решить синхронизацию в JavaScript - этот недавний фрагмент Q KrisKowal является хорошим показанием по этому вопросу.

Введите: Promises

Самый простой способ решить это с помощью promises - это объединить все с помощью единого обещания. Я знаю, что вы опытны с promises самостоятельно, но для более свежих читателей давайте вспомнить:

Promises являются абстракцией над понятием самого секвенирования. Обещание - это единый (читаемый дискретный) блок действий. Цепочка promises, очень похожая на ; на некоторых языках, отмечает конец одной операции и начало следующего. promises в JavaScript абстрактно две основные вещи - понятие действия, требующее времени и исключительных условий.

Здесь есть "более высокая" абстракция, называемая монадой, а A + promises не строго соблюдают законы монады (для удобства) существуют реализации promises. promises аннотация определенного вида обработки, где монады абстрагируют понятие самой обработки, вы можете сказать, что обещание - это монада или, по крайней мере, что они являются монадическими.

Promises начинаются как отложенные, означая, что они представляют действие, которое уже было запущено, но еще не завершено. В какой-то момент они могут пройти разрешение, в течение которых они устанавливают в одном из двух состояний:

  • Выполнено - указывает, что действие выполнено успешно.
  • Отклонено - указывает, что действие не завершено успешно.

Как только обещание будет разрешено, оно больше не может изменить свое состояние. Так же, как вы можете продолжить ; на следующей строке - вы можете продолжить обещание с помощью ключевого слова .then, которое привязывает предыдущее действие к следующему.

Решение производителя - потребителя.

Традиционное решение проблемы производителя/потребителя может быть выполнено с помощью традиционных конструкций concurrency, таких как семафоры Дийкстры. Действительно, такое решение существует через promises или простые обратные вызовы, но я считаю, что мы можем сделать что-то подобное.

Вместо этого мы будем поддерживать запущенную программу и каждый раз добавлять к ней новые действия.

var fsQueue = Promise.resolve(); // start a new chain

// one place
fsQueue = fsQueue.then(function(){ // assuming promisified fs here
    return fs.writeAsync(...); 
});

// some other place
fsQueue = fsQueue.then(function(){
    return fs.writeAsync(...);
});

Добавление действий в очередь гарантирует, что мы заказали синхронизацию, и действия будут выполняться только после того, как предыдущие завершены. Это самое простое решение для синхронизации этой проблемы и требует обертывания fs.asyncFunction вызовов .then их в вашу очередь.

Альтернативное решение будет использовать что-то похожее на "монитор" - мы можем гарантировать, что доступ согласован изнутри путем обертывания fs:

var fs = B.promisifyAll(require("fs")); // bluebird promisified fs 
var syncFs = { // sync stands for synchronized, not synchronous
    queue: B.resolve();
    writeAsync = function(){
        var args = arguments
        return (queue = queue.then( // only execute later
            return fs.writeAsync.apply(fs,arguments);
        });
    } // promisify other used functions similarly
};

Что создаст синхронизированные версии действий fs. Также возможно автоматизировать это (не проверено) с помощью чего-то подобного:

// assumes module is promisified and ignores nested functions
function synchronize(module){
    var ret = {}, queue = B.resolve();
    for(var fn in module){
        ret[fn] = function(){
            var args = arguments;
            queue = queue.then(function(){
                return module[fn].apply(module, args); 
            })
        };
    }
    ret.queue = queue; // expose the queue for handling errors
    return ret;
}

Какая должна быть версия модуля, которая синхронизирует все его действия. Обратите внимание, что мы получаем дополнительное преимущество: ошибки не подавляются, и файловая система не будет находиться в противоречивом состоянии, потому что действия не будут выполняться до тех пор, пока не будет обработана ошибка, из-за которой не будет выполнено действие.

Разве это не похоже на очередь?

Да! Очереди делают что-то очень похожее (что вы можете увидеть в другом ответе), предоставляя первую в первой структуре структуру действий. Очень похоже на программный код, который выполняется в этом порядке. promises - это просто более сильная сторона той же монеты, на мой взгляд.

Другой ответ также обеспечивает жизнеспособную возможность через очереди.

О предлагаемых подходах

Переключитесь на синхронный ввод-вывод, чтобы записать данные на диск (на самом деле не хотите делать это для ответа на сервер).

В то время как я согласен, что это самый простой - подход "монитора" для цепочки всех действий, которые необходимо синхронизировать в одной очереди, очень похож.

Установите флаг, когда я начал писать данные и не записывал никаких новых данных, пока этот флаг установлен (заставляет меня потерять запись данных во время записи).

Этот флаг эффективно является мьютеком. Если вы блокируете (или уступаете и помещаете действие в очередь), когда кто-то пытается сделать это, у вас есть реальный мьютекс, который содержит "гарантии мьютекса".

Повторная попытка с этим флагом и сохранение списка следующих действий для хранения флага на самом деле очень распространены в реализациях семафора - один пример находится в ядре linux.

Более сложные версии опции 2, где я устанавливаю флаг и когда установлен флаг, новые данные поступают в отдельную временную структуру данных, которая, когда файл IO выполняется, затем объединяется с реальными данными (выполнимо, но кажется некрасиво). Сделайте копию моментальных копий исходных данных и не спешите писать эту копию на диск, зная, что никто не будет изменять копию. Я не хочу этого делать, потому что набор данных относительно велик, и я в ограниченной среде памяти (малина PI).

Эти подходы обычно называются транзакционными обновлениями RCU, в некоторых случаях они очень современные и очень быстрые - например, для "проблемы писателей-писателей" (что очень похоже на то, что у вас есть). Родная поддержка для них ударила в ядре linux совсем недавно. Выполнение этого в определенных случаях на самом деле является жизнеспособным и эффективным, хотя в вашем случае это слишком сложно, как вы полагаете.

Итак, подведем итоги

  • Это непростая задача, но интересная.
  • К счастью, promises решить это довольно хорошо, они были построены именно для решения этой проблемы путем абстрагирования понятия последовательности.

Счастливое кодирование, проект Pi NodeJS звучит потрясающе. Дайте мне знать, если я смогу прояснить это дальше.