В рамках моего исследования я пишу эхо-сервер с высокой нагрузкой TCP/IP в Java. Я хочу обслуживать около 3-4 тыс. Клиентов и просматривать максимально возможные сообщения в секунду, которые я могу выжать из него. Размер сообщения довольно мал - до 100 байт. Эта работа не имеет практической цели - только исследования.
В соответствии с многочисленными презентациями, которые я видел (тесты HornetQ, переговоры LMAX Disruptor и т.д.), системы высокой нагрузки реального мира имеют тенденцию обслуживать миллионы транзакций в секунду (я считаю, что Disruptor упомянул около 6 мил и Hornet - 8.5). Например, этот пост утверждает, что можно достичь до 40M MPS. Поэтому я воспринял это как приблизительную оценку того, на что способен современное оборудование.
Я написал простейший однопоточный NIO-сервер и запустил тест нагрузки. Я был немного удивлен, что могу получить только около 100 тыс. MPS на localhost и 25 тыс. С реальной сетью. Номера выглядят довольно маленькими. Я тестировал Win7 x64, ядро i7. Глядя на загрузку процессора - занято только одно ядро (что ожидается в однопоточном приложении), а остальные сидят без дела. Однако, даже если я загружу все 8 ядер (включая виртуальные), у меня будет не более 800k MPS - даже не до 40 миллионов:)
Мой вопрос: что такое типичный шаблон для подачи огромного количества сообщений клиентам? Должен ли я распределять сетевую нагрузку на несколько разных сокетов внутри одной JVM и использовать какой-то балансировщик нагрузки, например HAProxy, для распределения нагрузки на несколько ядер? Или я должен смотреть на использование нескольких селекторов в моем коде NIO? Или, может быть, даже распределить нагрузку между несколькими JVM и использовать Chronicle для создания межпроцессного взаимодействия между ними? Будет ли тестирование на правильной серверной ОС, такой как CentOS, иметь большое значение (может быть, Windows замедляет работу)?
Ниже приведен пример кода моего сервера. Он всегда отвечает "ok" на любые входящие данные. Я знаю, что в реальном мире мне нужно будет отслеживать размер сообщения и быть готовым к тому, что одно сообщение может быть разделено между несколькими чтениями, но я бы хотел, чтобы все было просто супер.
public class EchoServer {
private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;
// The buffer into which we'll read data when it available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
private InetAddress hostAddress = null;
private int port;
private Selector selector;
private long loopTime;
private long numMessages = 0;
public EchoServer() throws IOException {
this(DEFAULT_PORT);
}
public EchoServer(int port) throws IOException {
this.port = port;
selector = initSelector();
loop();
}
private void loop() {
while (true) {
try{
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("Client is connected");
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it ready for new data
readBuffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
key.cancel();
socketChannel.close();
System.out.println("Forceful shutdown");
return;
}
if (numRead == -1) {
System.out.println("Graceful shutdown");
key.channel().close();
key.cancel();
return;
}
socketChannel.register(selector, SelectionKey.OP_WRITE);
numMessages++;
if (numMessages%100000 == 0) {
long elapsed = System.currentTimeMillis() - loopTime;
loopTime = System.currentTimeMillis();
System.out.println(elapsed);
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));
socketChannel.write(dummyResponse);
if (dummyResponse.remaining() > 0) {
System.err.print("Filled UP");
}
key.interestOps(SelectionKey.OP_READ);
}
private Selector initSelector() throws IOException {
Selector socketSelector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
serverChannel.socket().bind(isa);
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}
public static void main(String[] args) throws IOException {
System.out.println("Starting echo server");
new EchoServer();
}
}