Подтвердить что ты не робот

Базовые данные байта в С#

Мое приложение считывает байты из сокета TCP и требует их буферизации, поэтому я могу извлечь из них сообщения позже. Из-за характера TCP я могу получать частичные или несколько сообщений в одном чтении, поэтому после каждого чтения я хотел бы проверить буфер и извлечь столько сообщений, сколько доступно.

Поэтому я хочу класс, который позволяет мне сделать следующее:

  • добавить к нему произвольные байты [].
  • проверять содержимое без его использования, в частности проверять количество контента, а также искать наличие определенного байта или байта
  • извлечь и использовать часть данных в виде байта [], оставив остальную часть там для будущего чтения

Я ожидаю, что то, что я хочу, может быть сделано с 1 или более существующими классами в библиотеке .NET, но я не уверен, какие из них. System.IO.MemoryStream выглядит примерно так, как я хочу, но (а) неясно, подходит ли он для использования в качестве буфера (извлекаются ли данные чтения из емкости?) и (б) чтения и записи, похоже, происходят в одном месте - "Текущее положение потока - это позиция, в которой может выполняться следующая операция чтения или записи". - Это не то, что я хочу. Мне нужно писать до конца и читать с фронта.

4b9b3361

Ответ 1

Просто используйте большой байтовый массив и Array.Copy - он должен сделать трюк. Если нет, используйте List<byte>.

Если вы используете массив, вам нужно реализовать индекс (где вы копируете дополнительные данные) самостоятельно (то же самое для проверки содержимого), но это просто.

Если вам интересно: вот простая реализация "циклического буфера". Тест должен выполняться (я набросил на него пару unit test, но он не проверял весь критический путь):

public class ReadWriteBuffer
{
    private readonly byte[] _buffer;
    private int _startIndex, _endIndex;

    public ReadWriteBuffer(int capacity)
    {
        _buffer = new byte[capacity];
    }

    public int Count
    {
        get
        {
            if (_endIndex > _startIndex)
                return _endIndex - _startIndex;
            if (_endIndex < _startIndex)
                return (_buffer.Length - _startIndex) + _endIndex;
            return 0;
        }
    }

    public void Write(byte[] data)
    {
        if (Count + data.Length > _buffer.Length)
            throw new Exception("buffer overflow");
        if (_endIndex + data.Length >= _buffer.Length)
        {
            var endLen = _buffer.Length - _endIndex;
            var remainingLen = data.Length - endLen;

            Array.Copy(data, 0, _buffer, _endIndex, endLen);
            Array.Copy(data, endLen, _buffer, 0, remainingLen);
            _endIndex = remainingLen;
        }
        else
        {
            Array.Copy(data, 0, _buffer, _endIndex, data.Length);
            _endIndex += data.Length;
        }
    }

    public byte[] Read(int len, bool keepData = false)
    {
        if (len > Count)
            throw new Exception("not enough data in buffer");
        var result = new byte[len];
        if (_startIndex + len < _buffer.Length)
        {
            Array.Copy(_buffer, _startIndex, result, 0, len);
            if (!keepData)
                _startIndex += len;
            return result;
        }
        else
        {
            var endLen = _buffer.Length - _startIndex;
            var remainingLen = len - endLen;
            Array.Copy(_buffer, _startIndex, result, 0, endLen);
            Array.Copy(_buffer, 0, result, endLen, remainingLen);
            if (!keepData)
                _startIndex = remainingLen;
            return result;
        }
    }

    public byte this[int index]
    {
        get
        {
            if (index >= Count)
                throw new ArgumentOutOfRangeException();
            return _buffer[(_startIndex + index) % _buffer.Length];
        }
    }

    public IEnumerable<byte> Bytes
    {
        get
        {
            for (var i = 0; i < Count; i++)
                yield return _buffer[(_startIndex + i) % _buffer.Length];
        }
    }
}

Обратите внимание: код "потребляет" при чтении - если вы не хотите, чтобы просто удалили части "_startIndex =..." (или внесите дополнительный параметр перегрузки и проверьте или что-то еще).

Ответ 2

Я предлагаю вам использовать MemoryStream под капотом, но инкапсулировать его в другой класс, который хранит:

  • MemoryStream
  • Текущее "прочитанное" положение
  • Текущее "потребляемое" положение

Затем будет показано:

  • Запись: установка позиции потока до конца, запись данных, установка позиции потока обратно в позицию чтения
  • Чтение: чтение данных, установка позиции чтения в позицию потока.
  • Потребляйте: обновляйте потребляемую позицию (детали, основанные на том, как вы пытаетесь потреблять); если позиция потребления превышает определенный порог, скопируйте существующие буферизованные данные в новый MemoryStream и обновите все переменные. (Вероятно, вы не хотите копировать буфер для каждого запроса на потребление.)

Обратите внимание, что ни одно из этого не будет потокобезопасным без дополнительной синхронизации.

Ответ 3

Здесь другая реализация буфера, который я написал некоторое время назад:

  • Resizeable: позволяет помещать в очередь данные и исключать исключение переполнения буфера;
  • Эффективный: использует один буфер и операции Buffer.Copy для ввода/выгрузки данных

Ответ 4

Поступая так поздно, но для потомков:

Когда я делал это в прошлом, я придерживался небольшого разного подхода. Если ваши сообщения имеют фиксированный размер заголовка (который сообщает вам, сколько байтов в теле), и, учитывая, что сетевой поток уже буферизуется, я выполняю операцию в две фазы:

  • чтение потока для байтов для заголовка
  • последующее чтение потока для байтов для тела на основе заголовка
  • повторить

Это усиливает тот факт, что - для потока - когда вы запрашиваете "n" байты, вы никогда не получите больше назад, так что вы можете игнорировать многие "opps", которые я читаю слишком много, позвольте мне отложить это до следующего времени ".

Теперь это не вся история, чтобы быть справедливым. У меня был базовый класс-оболочка над потоком, чтобы обрабатывать проблемы фрагментации (т.е. При запросе на 4 байта, не возвращайте до получения 4 байтов или поток закрыт). Но этот бит довольно прост.

На мой взгляд, ключ состоит в том, чтобы развязать обработку сообщений с помощью механики потока, и если вы перестанете пытаться использовать это сообщение в виде единственного ReadBytes() из потока, жизнь становится намного проще.

[все это верно, если ваши чтения блокируются, или async (APM/await)]

Ответ 5

Я думаю, что BufferedStream является решением проблемы. Также можно перевести непрочитанный len байт данных, вызвав Seek.

BufferdStream buffer = new BufferedStream(tcpStream, size); // we have a buffer of size
...
...
while(...)
{
    buffer.Read(...);
    // do my staff
    // I read too much, I want to put back len bytes
    buffer.Seek(-len, SeekOrigin.End);
    // I shall come back and read later
}

Растущая память

В отличие от BufferedStream, где первоначально задан size, MemoryStream может расти.

Память потоковых данных

MemoryStream хранит все чтение dara все время, а BufferedStream содержит только сегмент данных потока.

Исходный поток и массив байтов

MemoryStream позволяет добавлять входные байты в метод Write(), который может быть Read() в будущем. Пока BufferedSteam принимает входные байты из другого источника-источника, указанного в конструкторе.

Ответ 6

Похоже, вы хотите читать из сокета в буфер MemoryStream, а затем "вытаскивать" данные из буфера и reset каждый раз, когда встречается определенный байт. Он будет выглядеть примерно так:

void ReceiveAllMessages(Action<byte[]> messageReceived, Socket socket)
{
    var currentMessage = new MemoryStream();
    var buffer = new byte[128];

    while (true)
    {
        var read = socket.Receive(buffer, 0, buffer.Length);
        if (read == 0)
            break;     // Connection closed

        for (var i = 0; i < read; i++)
        {
            var currentByte = buffer[i];
            if (currentByte == END_OF_MESSAGE)
            {
                var message = currentMessage.ToByteArray();
                messageReceived(message);

                currentMessage = new MemoryStream();
            }
            else
            {
                currentMessage.Write(currentByte);
            }
        }
    }
}

Ответ 7

Вы можете сделать это с помощью Stream обертывания ConcurrentQueue<ArraySegment<byte>> (помните, что это делает его только вперед). Однако мне действительно не нравится идея хранить данные в памяти, прежде чем что-то делать с ней; он раскрывает вас до кучи атак (намеренно или нет) относительно размера сообщения. Вы также можете Google "круговой буфер" .

Фактически вы должны писать код, который делает что-то значимое с данными, как только он будет получен: "Push Parsing" (это то, что, например, SAX поддерживает). В качестве примера, как вы это сделаете с текстом:

private Encoding _encoding;
private Decoder _decoder;
private char[] _charData = new char[4];

public PushTextReader(Encoding encoding)
{
    _encoding = encoding;
    _decoder = _encoding.GetDecoder();
}

// A single connection requires its own decoder
// and charData. That connection should never
// call this method from multiple threads
// simultaneously.
// If you are using the ReadAsyncLoop you
// don't need to worry about it.
public void ReceiveData(ArraySegment<byte> data)
{
    // The two false parameters cause the decoder
    // to accept 'partial' characters.
    var charCount = _decoder.GetCharCount(data.Array, data.Offset, data.Count, false);
    charCount = _decoder.GetChars(data.Array, data.Offset, data.Count, _charData, 0, false);
    OnCharacterData(new ArraySegment<char>(_charData, 0, charCount));
}

Если вы должны иметь возможность принимать полные сообщения перед десериализацией их, вы можете использовать MemoryMappedFile, что имеет то преимущество, что отправляющий объект не сможет вывести из памяти ваш сервер. То, что становится сложным, - это сброс файла обратно до нуля; потому что это может быть проблема. Один из способов решения этой проблемы:

Окончание TCP-приемника

  • Напишите текущему потоку.
  • Если поток превышает определенную длину, перейдите к новой.

Конец десериализации

  • Чтение из текущего потока.
  • Как только вы опорожнили поток, уничтожайте его.

Концевой терминал TCP очень прост. Для конца десериализатора потребуется логическая логика элементарного буфера (не забудьте использовать Buffer.BlockCopy, а не Array.Copy).

Боковое примечание: звучит как забавный проект, если у меня есть время и помню, что я мог бы продолжить и реализовать эту систему.

Ответ 8

Здесь есть только три ответа, которые содержат код. Один из них неуклюжий, а другие не отвечают на вопрос.

Здесь класс, который вы можете просто скопировать и вставить:

/// <summary>
/// This class is a very fast and threadsafe FIFO buffer
/// </summary>
public class FastFifo
{
    private List<Byte> mi_FifoData = new List<Byte>();

    /// <summary>
    /// Get the count of bytes in the Fifo buffer
    /// </summary>
    public int Count
    {
        get 
        { 
            lock (mi_FifoData)
            {
                return mi_FifoData.Count; 
            }
        }
    }

    /// <summary>
    /// Clears the Fifo buffer
    /// </summary>
    public void Clear()
    {
        lock (mi_FifoData)
        {
            mi_FifoData.Clear();
        }
    }

    /// <summary>
    /// Append data to the end of the fifo
    /// </summary>
    public void Push(Byte[] u8_Data)
    {
        lock (mi_FifoData)
        {
            // Internally the .NET framework uses Array.Copy() which is extremely fast
            mi_FifoData.AddRange(u8_Data);
        }
    }

    /// <summary>
    /// Get data from the beginning of the fifo.
    /// returns null if s32_Count bytes are not yet available.
    /// </summary>
    public Byte[] Pop(int s32_Count)
    {
        lock (mi_FifoData)
        {
            if (mi_FifoData.Count < s32_Count)
                return null;

            // Internally the .NET framework uses Array.Copy() which is extremely fast
            Byte[] u8_PopData = new Byte[s32_Count];
            mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count);
            mi_FifoData.RemoveRange(0, s32_Count);
            return u8_PopData;
        }
    }

    /// <summary>
    /// Gets a byte without removing it from the Fifo buffer
    /// returns -1 if the index is invalid
    /// </summary>
    public int PeekAt(int s32_Index)
    {
        lock (mi_FifoData)
        {
            if (s32_Index < 0 || s32_Index >= mi_FifoData.Count)
                return -1;

            return mi_FifoData[s32_Index];
        }
    }
}