Подтвердить что ты не робот

С# Begin/EndReceive - как читать большие данные?

При чтении данных в кусках, скажем, 1024, как мне продолжать читать из сокета, который получает сообщение размером более 1024 байт, пока не осталось данных? Должен ли я просто использовать BeginReceive только для чтения префикса длины пакета, а затем как только он будет извлечен, используйте Receive() (в потоке async), чтобы прочитать остальную часть пакета? Или есть другой способ?

изменить

Я думал, что ссылка на Jon Skeet имеет решение, но есть немного паролей с этим кодом. Используемый мной код:

public class StateObject
{
    public Socket workSocket = null;
    public const int BUFFER_SIZE = 1024;
    public byte[] buffer = new byte[BUFFER_SIZE];
    public StringBuilder sb = new StringBuilder();
}

public static void Read_Callback(IAsyncResult ar)
{
    StateObject so = (StateObject) ar.AsyncState;
    Socket s = so.workSocket;

    int read = s.EndReceive(ar);

    if (read > 0) 
    {
        so.sb.Append(Encoding.ASCII.GetString(so.buffer, 0, read));

        if (read == StateObject.BUFFER_SIZE)
        {
            s.BeginReceive(so.buffer, 0, StateObject.BUFFER_SIZE, 0, 
                    new AyncCallback(Async_Send_Receive.Read_Callback), so);
            return;
        }
    }

    if (so.sb.Length > 0)
    {
        //All of the data has been read, so displays it to the console
        string strContent;
        strContent = so.sb.ToString();
        Console.WriteLine(String.Format("Read {0} byte from socket" + 
        "data = {1} ", strContent.Length, strContent));
    }
    s.Close();
}

Теперь это исправление отлично работает отлично, но оно терпит неудачу, когда размер пакета кратен буферу. Причина этого заключается в том, что буфер заполняется при чтении, предполагается, что имеется больше данных; но та же проблема возникает, как и раньше. 2-байтовый буфер, для exmaple, заполняется дважды в 4-байтовом пакете и предполагает, что есть больше данных. Затем он блокируется, потому что читать нечего. Проблема заключается в том, что функция приема не знает, когда заканчивается пакет.


Это заставило меня подумать о двух возможных решениях: я мог либо иметь разделитель конца пакета, либо я мог бы прочитать заголовок пакета, чтобы найти длину, а затем получить именно эту сумму (как я изначально предложил).

Есть проблемы с каждым из них. Мне не нравится идея использования разделителя, так как пользователь может каким-то образом работать в пакете во входной строке из приложения и вкручивать его. Это также просто кажется мне неаккуратным.

Заголовок длины звучит нормально, но я планирую использовать протокольные буферы - я не знаю формат данных. Есть ли длина заголовка? Сколько байтов это? Будет ли это то, что я реализую сам? Etc..

Что мне делать?

4b9b3361

Ответ 1

Нет - вызовите BeginReceive снова из обработчика обратного вызова, пока EndReceive не вернет 0. В принципе, вы должны продолжать получать асинхронно, считая, что хотите получить максимальную выгоду от асинхронного ввода-вывода.

Если вы посмотрите на странице MSDN для Socket.BeginReceive, вы увидите пример этого. (По общему признанию, это не так просто, как могло бы быть.)

Ответ 2

Dang. Я не решаюсь даже ответить на это, учитывая, что сановники уже взвесили, но здесь идет. Будьте нежны, о Великие!

Не имея возможности прочитать блог Marc (он заблокирован здесь из-за корпоративной интернет-политики), я собираюсь предложить "по-другому".

Трюк, на мой взгляд, , чтобы отделить получение данных от обработки этих данных.

Я использую класс StateObject, определенный следующим образом. Он отличается от реализации MSDN StateObject тем, что он не включает объект StringBuilder, константа BUFFER_SIZE является частной и включает конструктор для удобства.

public class StateObject
{
    private const int BUFFER_SIZE = 65535;
    public byte[] Buffer = new byte[BUFFER_SIZE];
    public readonly Socket WorkSocket = null;

    public StateObject(Socket workSocket)
    {
        WorkSocket = workSocket;
    }
}

У меня также есть класс Packet, который является просто оболочкой вокруг буфера и временной метки.

public class Packet
{
    public readonly byte[] Buffer;
    public readonly DateTime Timestamp;

    public Packet(DateTime timestamp, byte[] buffer, int size)
    {
        Timestamp = timestamp;
        Buffer = new byte[size];
        System.Buffer.BlockCopy(buffer, 0, Buffer, 0, size);
    }
}

Моя функция ReceiveCallback() выглядит так.

public static ManualResetEvent PacketReceived = new ManualResetEvent(false);
public static List<Packet> PacketList = new List<Packet>();
public static object SyncRoot = new object();
public static void ReceiveCallback(IAsyncResult ar)
{
    try {
        StateObject so = (StateObject)ar.AsyncState;
        int read = so.WorkSocket.EndReceive(ar);

        if (read > 0) {
            Packet packet = new Packet(DateTime.Now, so.Buffer, read);
            lock (SyncRoot) {
                PacketList.Add(packet);
            }
            PacketReceived.Set();
        }

        so.WorkSocket.BeginReceive(so.Buffer, 0, so.Buffer.Length, 0, ReceiveCallback, so);
    } catch (ObjectDisposedException) {
        // Handle the socket being closed with an async receive pending
    } catch (Exception e) {
        // Handle all other exceptions
    }
}

Обратите внимание, что эта реализация абсолютно не обрабатывает полученные данные и не имеет ожиданий относительно того, сколько байтов должно быть получено. Он просто получает все данные, находящиеся в сокете (до 65535 байт), и сохраняет эти данные в списке пакетов, а затем сразу же приостанавливает очередное асинхронное получение.

Поскольку в потоке, который обрабатывает каждый асинхронный прием, больше не обрабатывается, данные, очевидно, будут обрабатываться другим потоком, поэтому операция Add() синхронизируется с помощью оператора блокировки. Кроме того, поток обработки (будь то основной поток или какой-либо другой выделенный поток) должен знать , когда есть данные для обработки. Для этого я обычно использую ManualResetEvent, что и было показано выше.

Вот как работает обработка.

static void Main(string[] args)
{
    Thread t = new Thread(
        delegate() {
            List<Packet> packets;
            while (true) {
                PacketReceived.WaitOne();
                PacketReceived.Reset();
                lock (SyncRoot) {
                    packets = PacketList;
                    PacketList = new List<Packet>();
                }

                foreach (Packet packet in packets) {
                    // Process the packet
                }
            }
        }
    );
    t.IsBackground = true;
    t.Name = "Data Processing Thread";
    t.Start();
}

Это базовая инфраструктура, которую я использую для всей моей связи сокетов. Он обеспечивает хорошее разделение между получением данных и обработкой этих данных.

Что касается другого вопроса, который у вас был, важно помнить при таком подходе, что каждый экземпляр пакета необязательно представляет собой полное сообщение в контексте вашего приложения. Экземпляр пакета может содержать частичное сообщение, одно сообщение или несколько сообщений, а ваши сообщения могут охватывать несколько экземпляров пакета. Я обратился к тому, как узнать, когда вы получили полное сообщение по связанному с вами вопросу здесь.

Ответ 3

Сначала вы должны прочитать префикс длины. После этого вы просто продолжаете читать байты в блоках (и вы можете сделать это async, как вы догадались), пока не исчерпаете количество байтов, которые, как вы знаете, приходят с провода.

Обратите внимание, что в какой-то момент при чтении последнего блока вы не захотите читать полные 1024 байта, в зависимости от того, что префикс длины говорит, что это сумма, и сколько байтов вы прочитали.

Ответ 4

Кажется, что в этом много путаницы. Примеры на веб-сайте MSDN для связи с асинхронным сокетом с использованием TCP являются вводящими в заблуждение и недостаточно объяснены. Вызов EndReceive действительно блокирует, если размер сообщения является точным кратным буфера приема. Это приведет к тому, что вы никогда не получите сообщение и приложение для зависания.

Просто чтобы очистить вещи - вы должны предоставить свой собственный разделитель для данных, если используете TCP. Прочтите следующее (это из ОЧЕНЬ надежного источника).

Потребность в данных приложения Разграничение

Другое влияние обработки TCP входящие данные как поток - это данные полученных приложением, использующим TCP неструктурирован. Для передачи поток данных переходит в TCP на одном устройства и при приеме, поток данные возвращаются в приложение на принимающее устройство. Хотя поток разбит на сегменты для передача по TCP, эти сегменты детали, которые скрыты на уровне TCP из приложения. Итак, когда устройство хочет отправить несколько частей данных, TCP не предоставляет механизма для указывая, где "разделительная линия" между кусками, поскольку TCP не рассматривает значение данных вообще. Приложение должно предоставляют средства для этого.

Рассмотрим, например, приложение который отправляет записи базы данных. Это необходимо передать запись № 579 из Таблица базы данных сотрудников, за которой следует запись № 581 и запись № 611. Он отправляет эти записи в TCP, который обрабатывает их все вместе как поток байт. TCP будет упаковывать эти байты в сегменты, но приложение не может предсказать. это возможно, что каждый окажется в другой сегмент, но скорее все они будут в одном сегменте или часть каждого из них будет сегментов, в зависимости от их длины. Сами записи должны иметь некоторые типа явных маркеров, поэтому принимающее устройство может определить, где заканчивается запись и начинается следующее.

Источник: http://www.tcpipguide.com/free/t_TCPDataHandlingandProcessingStreamsSegmentsandSequ-3.htm

Большинство примеров, которые я вижу в Интернете для использования EndReceive, ошибочны или вводят в заблуждение. Обычно это не вызывает проблем в примерах, потому что отправляется только одно предопределенное сообщение, а затем соединение закрывается.

Ответ 5

Также я столкнулся с такой же проблемой.

Когда я тестировал несколько раз, я обнаружил, что иногда несколько BeginReceive - EndReceive вызывают потерю пакетов. (Этот цикл был закончен ненадлежащим образом)

В моем случае я использовал два решения.

Сначала я определил достаточно размер пакета, чтобы сделать только 1 раз BeginReceive() ~ EndReceive();

Во-вторых, когда я получаю большой размер данных, я использовал NetworkStream.Read() вместо BeginReceive() - EndReceive().

Асинхронный сокет не прост в использовании, и ему нужно много понимать о сокете.

Ответ 6

Для информации (общее использование Begin/End) вы можете захотеть увидеть это сообщение в блоге; этот подход работает нормально для меня и сберегает большую боль...

Ответ 7

Это очень старая тема, но я нашел здесь что-то другое и нашел это:

Теперь это исправлено, работает нормально большую часть времени, но терпит неудачу, когда размер пакета кратен размеру буфера. Причина этого в том, что если буфер заполняется при чтении, предполагается, что данных больше; но та же проблема, что и раньше. Например, 2-байтовый буфер заполняется дважды в 4-байтовом пакете и предполагает, что данных больше. Затем он блокируется, потому что больше нечего читать. Проблема в том, что функция приема не знает, когда наступает конец пакета.

У меня была такая же проблема, и так как ни один из ответов, похоже, не решил эту проблему, я использовал Socket.Available.

public static void Read_Callback(IAsyncResult ar)
{
    StateObject so = (StateObject) ar.AsyncState;
    Socket s = so.workSocket;

    int read = s.EndReceive(ar);    
    if (read > 0) 
    {
        so.sb.Append(Encoding.ASCII.GetString(so.buffer, 0, read));

        if (s.Available == 0)
        {
            // All data received, process it as you wish
        }
    }
    // Listen for more data
    s.BeginReceive(so.buffer, 0, StateObject.BUFFER_SIZE, 0, 
                new AyncCallback(Async_Send_Receive.Read_Callback), so);
}

Надеюсь, что это помогает другим, ТАК помогли мне много раз, спасибо всем!