Подтвердить что ты не робот

Передача большого массива в дочерний процесс node

У меня сложная работа с интенсивным процессором, которую я хочу делать на большом массиве. В идеале я хотел бы передать это дочернему процессу.

var spawn = require('child_process').spawn;

// dataAsNumbers is a large 2D array
var child = spawn(process.execPath, ['/child_process_scripts/getStatistics', dataAsNumbers]);

child.stdout.on('data', function(data){
  console.log('from child: ', data.toString());
});

Но когда я это делаю, node дает ошибку:

spawn E2BIG

Я столкнулся с этой статьей

Таким образом, передача данных в дочерний процесс, по-видимому, является способом выхода. Теперь мой код:

var spawn = require('child_process').spawn;

console.log('creating child........................');

var options = { stdio: [null, null, null, 'pipe'] };
var args = [ '/getStatistics' ];
var child = spawn(process.execPath, args, options);

var pipe = child.stdio[3];

pipe.write(Buffer('awesome'));

child.stdout.on('data', function(data){
  console.log('from child: ', data.toString());
});

И затем в getStatistics.js:

console.log('im inside child');

process.stdin.on('data', function(data) {
  console.log('data is ', data);
  process.exit(0);
});

Однако обратный вызов в process.stdin.on не достигнут. Как я могу получить поток в моем дочернем script?

ИЗМЕНИТЬ

Мне пришлось отказаться от буферного подхода. Теперь я отправляю массив как сообщение:

var cp = require('child_process');
var child = cp.fork('/getStatistics.js');

child.send({ 
  dataAsNumbers: dataAsNumbers
});

Но это работает только тогда, когда длина dataAsNumber составляет менее 20 000, в противном случае время истекает.

4b9b3361

Ответ 1

С таким огромным количеством данных я бы рассмотрел использование разделяемой памяти, а не копирование данных в дочерний процесс (это то, что происходит, когда вы используете канал или передаете сообщения). Это позволит сэкономить память, сократить время процессора для родительского процесса и вряд ли ударит в какой-то предел.

shm-typed-array - очень простой модуль, который подходит для вашего приложения. Пример:

parent.js

"use strict";

const shm = require('shm-typed-array');
const fork = require('child_process').fork;

// Create shared memory
const SIZE = 20000000;
const data = shm.create(SIZE, 'Float64Array');

// Fill with dummy data
Array.prototype.fill.call(data, 1);

// Spawn child, set up communication, and give shared memory
const child = fork("child.js");
child.on('message', sum => {
    console.log(`Got answer: ${sum}`);

    // Demo only; ideally you'd re-use the same child
    child.kill();
});
child.send(data.key);

child.js

"use strict";

const shm = require('shm-typed-array');

process.on('message', key => {
    // Get access to shared memory
    const data = shm.get(key, 'Float64Array');

    // Perform processing
    const sum = Array.prototype.reduce.call(data, (a, b) => a + b, 0);

    // Return processed data
    process.send(sum);
});

Обратите внимание, что мы отправляем небольшой "ключ" из родительского дочернего процесса через IPC, а не все данные. Таким образом, мы сохраняем тонну памяти и времени.

Конечно, вы можете изменить 'Float64Array' (например, a double) на любой типизированный массив, который требуется вашему приложению. Обратите внимание, что эта библиотека, в частности, обрабатывает только одномерные типизированные массивы; но это должно быть лишь незначительным препятствием.

Ответ 2

Я тоже смог воспроизвести задержку, которую вы испытывали, но, возможно, не так плох, как вы. Я использовал следующие

// main.js
const fork = require('child_process').fork

const child = fork('./getStats.js')

const dataAsNumbers = Array(100000).fill(0).map(() =>
  Array(100).fill(0).map(() => Math.round(Math.random() * 100)))

child.send({
  dataAsNumbers: dataAsNumbers,
})

И

// getStats.js
process.on('message', function (data) {
  console.log('data is ', data)
  process.exit(0)
})

node main.js 2.72s пользователь 0.45s система 103% cpu 3.045 всего

Я генерирую 100k элементов, состоящих из 100 номеров, чтобы издеваться над вашими данными, убедитесь, что вы используете событие message на process. Но, возможно, ваши дети более сложны и могут быть причиной сбоя, также зависит от тайм-аута, который вы задали в своем запросе.


Если вы хотите получить лучшие результаты, то что вы можете сделать, это разбить ваши данные на несколько частей, которые будут отправлены дочернему процессу и восстановлены для формирования начального массива.


Одной из возможностей может быть использование сторонней библиотеки или протокола, даже если это немного больше работает. Вы могли бы взглянуть на messenger.js или даже что-то вроде очереди AMQP, которая могла бы позволить вам обмениваться данными между двумя процессами с пулом и гарант сообщения был подтвержден подпроцессом. Существует несколько реализаций node, таких как amqp.node, но для этого потребуется немного работы по настройке и настройке.

Ответ 3

Используйте кеш в памяти, например https://github.com/ptarjan/node-cache, и пусть родительский процесс хранит содержимое массива с помощью некоторого ключа, дочерний процесс будет потяните содержимое через этот ключ.

Ответ 4

Вы можете подумать о том, как использовать OS-каналы вы найдете здесь gist здесь в качестве вклада в дочернее приложение node.

Я знаю, что это не совсем то, о чем вы просите, но вы можете использовать модуль cluster (включен в node). Таким образом, вы можете получить столько случаев, сколько ядер, которые вы должны ускорить обработку. Кроме того, рассмотрите возможность использования потоков, если вам не нужно иметь все данные, доступные до начала обработки. Если обрабатываемые данные слишком велики, я бы сохранил их в файле, чтобы вы могли повторно заново использовать, если во время процесса возникла какая-либо ошибка. Вот пример кластеризации.

var cluster = require('cluster');
var numCPUs = 4;

if (cluster.isMaster) {
    for (var i = 0; i < numCPUs; i++) {
        var worker = cluster.fork();
        console.log('id', worker.id)
    }
} else {
    doSomeWork()
}

function doSomeWork(){
    for (var i=1; i<10; i++){
        console.log(i)
    }
}

Больше информации, отправляющей сообщения через рабочих вопрос 8534462.

Ответ 5

Почему вы хотите сделать подпроцесс? Посылка данных через подпроцессы, скорее всего, будет стоить больше с точки зрения процессора и реального времени, чем вы сэкономите в том, чтобы сделать обработку в рамках одного процесса.

Вместо этого я бы предположил, что для суперэффективного кодирования вы считаете, что ваши статистические вычисления в рабочем потоке выполняются в той же памяти, что и основной процесс nodejs.

Вы можете использовать NAN для написания кода на С++, который вы можете отправить в рабочий поток, а затем иметь этот рабочий поток для после завершения отправьте результат и событие обратно в цикл событий nodejs.

Преимущество этого в том, что вам не нужно дополнительное время для передачи данных в другой процесс, но недостатком является то, что вы напишете немного кода на С++ для действия с потоком, но расширение NAN должно принять заботиться о большинстве трудных задач для вас.

Ответ 6

Для длительных задач процесса вы можете использовать что-то вроде gearman. Вы можете сделать тяжелый рабочий процесс для рабочих, таким образом, вы можете настройте сколько работников вам нужно, например, я делаю некоторую обработку файлов таким образом, если мне нужно масштабировать, вы создаете больше экземпляра рабочего, также у меня есть разные работники для разных задач, обрабатывайте zip файлы, создавайте эскизы и т.д., польза от этого рабочие могут быть записаны на любом языке node.js, Java, python и могут быть легко интегрированы в ваш проект.

// worker-unzip.js
const debug = require('debug')('worker:unzip');
const {series, apply} = require('async');
const gearman = require('gearmanode');
const {mkdirpSync} = require('fs-extra');
const extract = require('extract-zip');

module.exports.unzip = unzip;
module.exports.worker = worker;

function unzip(inputPath, outputDirPath, done) {
  debug('unzipping', inputPath, 'to', outputDirPath);
  mkdirpSync(outputDirPath);
  extract(inputPath, {dir: outputDirPath}, done);
}


/**
 *
 * @param {Job} job
 */
function workerUnzip(job) {
  const {inputPath, outputDirPath} = JSON.parse(job.payload);
  series([
    apply(unzip, inputPath, outputDirPath),
    (done) => job.workComplete(outputDirPath)
  ], (err) => {
    if (err) {
      console.error(err);
      job.reportError();
    }
  });
}

function worker(config) {
  const worker = gearman.worker(config);
  if (config.id) {
    worker.setWorkerId(config.id);
  }

  worker.addFunction('unzip', workerUnzip, {timeout: 10, toStringEncoding: 'ascii'});
  worker.on('error', (err) => console.error(err));

  return worker;
}

простой index.js

const unzip = require('./worker-unzip').worker;

unzip(config); // pass host and port of the Gearman server

Обычно я запускаю рабочих с PM2

интеграция с вашим кодом очень проста. что-то вроде

//initialize
const gearman = require('gearmanode');

gearman.Client.logger.transports.console.level = 'error';
const client = gearman.client(configGearman); // same host and port

просто добавьте работу в очередь, передав имя функции

const taskpayload = {inputPath: '/tmp/sample-file.zip', outputDirPath: '/tmp/unzip/sample-file/'}
const job client.submitJob('unzip', JSON.stringify(taskpayload));
job.on('complete', jobCompleteCallback);
job.on('error', jobErrorCallback);