Подтвердить что ты не робот

Масштабирование Socket.IO для нескольких процессов Node.js с использованием кластера

Разрывая мои волосы с этим... кому-то удалось масштабировать Socket.IO на несколько "рабочих" процессов, порожденных Node.js cluster?

Предположим, что у меня есть следующие рабочие процессы четыре (псевдо):

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {

  socket.on('join', function(rooms) {
    rooms.forEach(function(room) {
      socket.join(room);
    });
  });

  socket.on('leave', function(rooms) {
    rooms.forEach(function(room) {
      socket.leave(room);
    });
  });

});

// Emit a message every second
function send() {
  io.sockets.in('room').emit('data', 'howdy');
}

setInterval(send, 1000);

И в браузере...

// on the client
socket = io.connect();
socket.emit('join', ['room']);

socket.on('data', function(data){
  console.log(data);
});

Проблема: Каждая вторая, я получаю сообщения четыре, из-за четырех отдельных рабочих процессов, отправляющих сообщения.

Как обеспечить, чтобы сообщение отправлялось только один раз?

4b9b3361

Ответ 1

Изменить: В Socket.IO 1.0+ вместо установки хранилища с несколькими клиентами Redis теперь можно использовать более простой модуль адаптера Redis.

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

Ниже показан пример, показанный ниже:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));
  io.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

Если у вас есть мастер node, который должен публиковаться в других процессах Socket.IO, но сам не принимает соединения сокетов, используйте socket.io-emitter вместо socket.io-redis.

Если у вас возникли проблемы с масштабированием, запустите приложения node с помощью DEBUG=*. Socket.IO теперь реализует debug, который также будет распечатывать отладочные сообщения адаптера Redis. Пример вывода:

socket.io:server initializing namespace / +0ms
socket.io:server creating engine.io instance with opts {"path":"/socket.io"} +2ms
socket.io:server attaching client serving req handler +2ms
socket.io-parser encoding packet {"type":2,"data":["event","payload"],"nsp":"/"} +0ms
socket.io-parser encoded {"type":2,"data":["event","payload"],"nsp":"/"} as 2["event","payload"] +1ms
socket.io-redis ignore same uid +0ms

Если оба основных и дочерних процесса отображают одинаковые сообщения парсера, ваше приложение правильно масштабируется.


Не должно быть проблем с вашей установкой, если вы излучаете одного работника. То, что вы делаете, исходит от всех четырех рабочих, и из-за публикации/подписки Redis сообщения не дублируются, а четыре раза записываются, как вы просили приложение сделать. Здесь простая схема того, что делает Редис:

Client  <--  Worker 1 emit -->  Redis
Client  <--  Worker 2  <----------|
Client  <--  Worker 3  <----------|
Client  <--  Worker 4  <----------|

Как вы можете видеть, когда вы выходите из рабочего, он опубликует emit для Redis, и он будет отражен от других работников, которые подписались на базу данных Redis. Это также означает, что вы можете использовать несколько серверов сокетов, подключенных к одному и тому же экземпляру, и на всех подключенных серверах будет запущен излучение на одном сервере.

С кластером, когда клиент подключается, он будет подключаться к одному из ваших четырех сотрудников, а не ко всем четырем. Это также означает, что все, что вы испускаете от этого работника, будет отображаться только одному клиенту. Итак, да, приложение масштабируется, но, как вы это делаете, вы испускаете всех четырех рабочих, а база данных Redis делает это так, как если бы вы звонили ему четыре раза на одного работника. Если клиент действительно подключился ко всем четырем экземплярам вашего сокета, они получат шестнадцать сообщений в секунду, а не четыре.

Тип обработки сокетов зависит от типа приложения, которое вы собираетесь использовать. Если вы собираетесь обрабатывать клиентов по отдельности, тогда у вас не должно быть проблем, потому что событие соединения будет срабатывать только для одного рабочего на одного клиента. Если вам нужно глобальное "сердцебиение", тогда у вас может быть обработчик сокета в вашем основном процессе. Поскольку работники умирают, когда мастер-процесс умирает, вы должны компенсировать нагрузку на соединение основного процесса и позволить дочерним элементам обрабатывать соединения. Вот пример:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.sockets.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  io.sockets.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

В примере есть пять экземпляров Socket.IO, один из которых является мастером, а четыре - дочерними. Главный сервер никогда не вызывает listen(), поэтому на этот процесс нет никаких дополнительных затрат на соединение. Однако, если вы вызываете emit в основном процессе, он будет опубликован в Redis, и четыре рабочих процесса будут выполнять emit на своих клиентах. Это компенсирует нагрузку на соединение для рабочих, и если работник должен умереть, ваша основная логика приложения будет не затронута в главном.

Обратите внимание, что с помощью Redis все испускает, даже в пространстве имен или в комнате, будут обрабатываться другими рабочими процессами, как если бы вы инициировали испускание этого процесса. Другими словами, если у вас есть два экземпляра Socket.IO с одним экземпляром Redis, вызов emit() в сокете первого работника отправит данные своим клиентам, а рабочий два будет делать так же, как если бы вы вызвали emit из этот рабочий.

Ответ 2

Пусть мастер обрабатывает ваше сердцебиение (пример ниже) или запускает несколько процессов на разных портах внутри и загружает их с помощью nginx (который поддерживает также websockets от V1.3 вверх).

Кластер с мастером

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {
    socket.on('join', function(rooms) {
        rooms.forEach(function(room) {
            socket.join(room);
        });
    });

    socket.on('leave', function(rooms) {
        rooms.forEach(function(room) {
            socket.leave(room);
        });
    });

});

if (cluster.isMaster) {
    // Fork workers.
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    // Emit a message every second
    function send() {
        console.log('howdy');
        io.sockets.in('room').emit('data', 'howdy');
    }

    setInterval(send, 1000);


    cluster.on('exit', function(worker, code, signal) {
        console.log('worker ' + worker.process.pid + ' died');
    }); 
}

Ответ 3

На самом деле это выглядит как Socket.IO, способное масштабироваться. Вы ожидаете, что сообщение с одного сервера будет отправлено во все сокеты в этой комнате, независимо от того, к какому серверу они подключены.

Лучше всего иметь один мастер-процесс, который отправляет сообщение каждую секунду. Вы можете сделать это, только запустив его, если cluster.isMaster, например.