Подтвердить что ты не робот

Высокопроизводительный TCP-сервер в С#

Я - опытный разработчик С#, но пока не разработал приложение TCP-сервера. Теперь мне нужно разработать высоко масштабируемый и высокопроизводительный сервер, который может обрабатывать не менее 5-10 тысяч одновременных подключений: получение байтов-данных через GPRS с устройств GPS.

Общий процесс коммуникации должен выглядеть следующим образом:

  • Устройство GPS инициирует подключение к моему серверу
  • мой сервер отвечает, если я хочу получить данные
  • устройство отправляет данные GPS
  • мой сервер отправляет отчет на устройство о его получении (sg like checksum)
  • получение новых данных из GPS, reportm, и это происходит снова и снова.
  • позднее устройство GPS закрывает соединение

Итак, на моем сервере мне нужно

  • отслеживание подключенных/активных клиентов
  • закрыть любого клиента со стороны сервера
  • поймать событие, когда устройство закрывает соединение
  • получить данные байта
  • отправить данные клиентам

Я начал читать об этой теме через Интернет, но для меня это был кошмар. Есть много способов, но я не мог узнать, какой из них лучше.

Способы сокета Async кажутся мне лучше, но писать код в этом асинхронном стиле ужасно и нелегко отлаживать.

Итак, мой вопрос: какой, по вашему мнению, лучший способ реализовать высокопроизводительный TCP-сервер в С#? Знаете ли вы какой-либо хороший компонент с открытым исходным кодом для этого? (Я попробовал несколько, но я не мог найти хорошего.)

4b9b3361

Ответ 1

Он должен быть асинхронным, нет никакого способа обойти это. Высокая производительность и масштабируемость не смешиваются с одним потоком на один разъем. Вы можете посмотреть, что делают сами StackExchange, см. async Redis ждет BookSleeve, который использует функции CTP из следующей версии С# (так же работает край и подвержен изменениям, но это круто). Для еще большего количества кровотечений решения развиваются вокруг использования класса SocketAsyncEventArgs, что делает еще один шаг вперед, устраняя частые распределения асинхронных обработчиков, связанных с "классическим" С# асинхронная обработка:

Класс SocketAsyncEventArgs является частью набора улучшений для Класс System.Net.Sockets.Socket, который обеспечить альтернативный асинхронный шаблон, который может быть использован специализированный высокопроизводительный разъем Приложения. Этот класс был специально предназначенные для сети серверных приложений, требующих высокой представление. Приложение может использовать расширенный асинхронный шаблон исключительно или только в целевых горячих областей (например, при получении большие объемы данных).

Короче говоря: изучите асинхронные или умирающие попытки...

Кстати, если вы спрашиваете, почему async, а затем прочитайте три статьи, связанные с этим сообщением: Высокопроизводительные программы Windows. Окончательный ответ таков: базовый дизайн ОС требует его.

Ответ 2

Как говорит Ремус, вы должны использовать async, чтобы поддерживать высокую производительность. Это методы Begin.../End... в .NET.

Под капотом для сокетов эти методы используют порты ввода-вывода IO, которые, по-видимому, являются наиболее эффективным способом обработки многих сокетов в операционных системах Windows.

Как говорит Джим, класс TcpClient может помочь здесь и довольно прост в использовании. Ниже приведен пример использования TcpListener для прослушивания входящих подключений и TcpClient для их обработки, причем начальные вызовы BeginAccept и BeginRead являются асинхронными.

В этом примере предполагается, что для сокетов используется протокол на основе сообщений, и это исключено, за исключением того, что первые 4 байта каждой передачи являются длиной, но это позволяет использовать синхронное чтение в потоке, чтобы получить остальное данных, которые уже буферизованы.

Вот код:

class ClientContext
{
    public TcpClient Client;
    public Stream Stream;
    public byte[] Buffer = new byte[4];
    public MemoryStream Message = new MemoryStream();
}

class Program
{
    static void OnMessageReceived(ClientContext context)
    {
        // process the message here
    }

    static void OnClientRead(IAsyncResult ar)
    {
        ClientContext context = ar.AsyncState as ClientContext;
        if (context == null)
            return;

        try
        {
            int read = context.Stream.EndRead(ar);
            context.Message.Write(context.Buffer, 0, read);

            int length = BitConverter.ToInt32(context.Buffer, 0);
            byte[] buffer = new byte[1024];
            while (length > 0)
            {
                read = context.Stream.Read(buffer, 0, Math.Min(buffer.Length, length));
                context.Message.Write(buffer, 0, read);
                length -= read;
            }

            OnMessageReceived(context);
        }
        catch (System.Exception)
        {
            context.Client.Close();
            context.Stream.Dispose();
            context.Message.Dispose();
            context = null;
        }
        finally
        {
            if (context != null)
                context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
        }
    }

    static void OnClientAccepted(IAsyncResult ar)
    {
        TcpListener listener = ar.AsyncState as TcpListener;
        if (listener == null)
            return;

        try
        {
            ClientContext context = new ClientContext();
            context.Client = listener.EndAcceptTcpClient(ar);
            context.Stream = context.Client.GetStream();
            context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
        }
        finally
        {
            listener.BeginAcceptTcpClient(OnClientAccepted, listener);
        }
    }

    static void Main(string[] args)
    {
        TcpListener listener = new TcpListener(new IPEndPoint(IPAddress.Any, 20000));
        listener.Start();

        listener.BeginAcceptTcpClient(OnClientAccepted, listener);

        Console.Write("Press enter to exit...");
        Console.ReadLine();
        listener.Stop();
    }
}

Он демонстрирует, как обрабатывать асинхронные вызовы, но для этого потребуется обработать обработку ошибок, чтобы убедиться, что TcpListener всегда принимает новые подключения и больше обработки ошибок, когда клиенты неожиданно отключаются. Кроме того, похоже, что есть несколько случаев, когда не все данные поступают в один проход, которые также нуждаются в обработке.

Ответ 3

Вы можете сделать это с помощью класса TcpClient, хотя, честно говоря, я не знаю, можете ли вы открыть 10 тысяч Розетки. Это довольно много. Но я регулярно использую TcpClient для обработки десятков параллельных сокетов. И асинхронная модель на самом деле очень приятно использовать.

Ваша самая большая проблема не будет заключаться в работе TcpClient. С 10 тысячами параллельных соединений я думаю, что пропускная способность и масштабируемость будут проблемой. Я даже не знаю, может ли одна машина обрабатывать весь этот трафик. Я полагаю, это зависит от того, насколько велики пакеты и как часто они входят. Но вам лучше сделать некоторую предварительную оценку конверта, прежде чем совершить реализацию этого всего на одном компьютере.

Ответ 4

Я думаю, вы также ищете методы UDP. Для клиентов 10k это быстро, но проблема в том, что вам нужно реализовать подтверждение для каждого сообщения, которое вы получили. В UDP вам не нужно открывать сокет для каждого клиента, но ему нужно реализовать механизм heartbeat/ping через x секунд, чтобы проверить, какой клиент подключен или нет.

Ответ 5

Вы можете использовать мой TCP CSharpServer, который я сделал, его очень просто реализовать, просто выполните IClientRequest в одном из ваших классов.

using System;
using System.Collections.Generic;
using System.Linq;

namespace cSharpServer
{
    public interface IClientRequest
    {        
        /// <summary>
        /// this needs to be set, otherwise the server will not beable to handle the request.
        /// </summary>
        byte IdType { get; set; } // This is used for Execution.
        /// <summary>
        /// handle the process by the client.
        /// </summary>
        /// <param name="data"></param>
        /// <param name="client"></param>
        /// <returns></returns>
        byte[] Process(BinaryBuffer data, Client client);
    }
}

BinaryBuffer позволяет вам легко читать данные, отправленные на сервер.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;

namespace cSharpServer
{
    public class BinaryBuffer
    {
        private const string Str0001 = "You are at the End of File!";
        private const string Str0002 = "You are Not Reading from the Buffer!";
        private const string Str0003 = "You are Currenlty Writing to the Buffer!";
        private const string Str0004 = "You are Currenlty Reading from the Buffer!";
        private const string Str0005 = "You are Not Writing to the Buffer!";
        private const string Str0006 = "You are trying to Reverse Seek, Unable to add a Negative value!";
        private bool _inRead;
        private bool _inWrite;
        private List<byte> _newBytes;
        private int _pointer;
        public byte[] ByteBuffer;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public override string ToString()
        {
            return Helper.DefaultEncoding.GetString(ByteBuffer, 0, ByteBuffer.Length);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(string data)
            : this(Helper.DefaultEncoding.GetBytes(data))
        {
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer()
        {
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(byte[] data)
            : this(ref data)
        {
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(ref byte[] data)
        {
            ByteBuffer = data;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void IncrementPointer(int add)
        {
            if (add < 0)
            {
                throw new Exception(Str0006);
            }
            _pointer += add;
            if (EofBuffer())
            {
                throw new Exception(Str0001);
            }
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int GetPointer()
        {
            return _pointer;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public static string GetString(ref byte[] buffer)
        {
            return Helper.DefaultEncoding.GetString(buffer, 0, buffer.Length);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public static string GetString(byte[] buffer)
        {
            return GetString(ref buffer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void BeginWrite()
        {
            if (_inRead)
            {
                throw new Exception(Str0004);
            }
            _inWrite = true;

            _newBytes = new List<byte>();
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(float value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.AddRange(BitConverter.GetBytes(value));
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(byte value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.Add(value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(int value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }

            _newBytes.AddRange(BitConverter.GetBytes(value));
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(long value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            byte[] byteArray = new byte[8];

            unsafe
            {
                fixed (byte* bytePointer = byteArray)
                {
                    *((long*)bytePointer) = value;
                }
            }

            _newBytes.AddRange(byteArray);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int UncommitedLength()
        {
            return _newBytes == null ? 0 : _newBytes.Count;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void WriteField(string value)
        {
            Write(value.Length);
            Write(value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(string value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            byte[] byteArray = Helper.DefaultEncoding.GetBytes(value);
            _newBytes.AddRange(byteArray);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(decimal value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            int[] intArray = decimal.GetBits(value);

            Write(intArray[0]);
            Write(intArray[1]);
            Write(intArray[2]);
            Write(intArray[3]);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void SetInt(int value, int pos)
        {
            byte[] byteInt = BitConverter.GetBytes(value);
            for (int i = 0; i < byteInt.Length; i++)
            {
                _newBytes[pos + i] = byteInt[i];
            }
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void SetLong(long value, int pos)
        {
            byte[] byteInt = BitConverter.GetBytes(value);
            for (int i = 0; i < byteInt.Length; i++)
            {
                _newBytes[pos + i] = byteInt[i];
            }
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(byte[] value)
        {
            Write(ref value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(ref byte[] value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.AddRange(value);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void EndWrite()
        {
            if (ByteBuffer != null)
            {
                _newBytes.InsertRange(0, ByteBuffer);
            }
            ByteBuffer = _newBytes.ToArray();
            _newBytes = null;
            _inWrite = false;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void EndRead()
        {
            _inRead = false;
            _pointer = 0;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void BeginRead()
        {
            if (_inWrite)
            {
                throw new Exception(Str0003);
            }
            _inRead = true;
            _pointer = 0;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public byte ReadByte()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer())
            {
                throw new Exception(Str0001);
            }
            return ByteBuffer[_pointer++];
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int ReadInt()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(4))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += 4;

            return BitConverter.ToInt32(ByteBuffer, startPointer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public float[] ReadFloatArray()
        {
            float[] dataFloats = new float[ReadInt()];
            for (int i = 0; i < dataFloats.Length; i++)
            {
                dataFloats[i] = ReadFloat();
            }
            return dataFloats;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public float ReadFloat()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(sizeof(float)))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += sizeof(float);

            return BitConverter.ToSingle(ByteBuffer, startPointer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public decimal ReadDecimal()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(16))
            {
                throw new Exception(Str0001);
            }
            return new decimal(new[] { ReadInt(),
                ReadInt(),
                ReadInt(),
                ReadInt()
            });
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public long ReadLong()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(8))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += 8;

            return BitConverter.ToInt64(ByteBuffer, startPointer);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public string ReadString(int size)
        {
            return Helper.DefaultEncoding.GetString(ReadByteArray(size), 0, size);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public byte[] ReadByteArray(int size)
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(size))
            {
                throw new Exception(Str0001);
            }
            byte[] newBuffer = new byte[size];

            Array.Copy(ByteBuffer, _pointer, newBuffer, 0, size);

            _pointer += size;

            return newBuffer;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool EofBuffer(int over = 1)
        {
            return ByteBuffer == null || ((_pointer + over) > ByteBuffer.Length);
        }
    }
}

Полный проект находится на GitHub CSharpServer