Подтвердить что ты не робот

Непрерывно читаешь из потока?

У меня есть объект Stream, который иногда получает некоторые данные на нем, но с непредсказуемыми интервалами. Сообщения, которые появляются в потоке, четко определены и заранее объявляют размер своей полезной нагрузки (размер - это 16-разрядное целое число, содержащееся в первых двух байтах каждого сообщения).

Я хотел бы иметь класс StreamWatcher, который обнаруживает, когда Stream имеет некоторые данные на нем. Как только это произойдет, я хочу, чтобы событие было создано так, чтобы подписанный экземпляр StreamProcessor мог обработать новое сообщение.

Можно ли это сделать с событиями С# без непосредственного использования потоков? Похоже, что это должно быть просто, но я не могу дотянуться до правильного пути его разработки.

4b9b3361

Ответ 1

Если вы говорите, что не используете потоки напрямую, я предполагаю, что вы все равно хотите использовать их косвенно через асинхронные вызовы, иначе это не очень полезно.

Все, что вам нужно сделать, это обернуть методы async для Stream и сохранить результат в буфере. Во-первых, определим часть события spec:

public delegate void MessageAvailableEventHandler(object sender,
    MessageAvailableEventArgs e);

public class MessageAvailableEventArgs : EventArgs
{
    public MessageAvailableEventArgs(int messageSize) : base()
    {
        this.MessageSize = messageSize;
    }

    public int MessageSize { get; private set; }
}

Теперь, прочитайте одно 16-битовое целое из потока асинхронно и отчитайте, когда он готов:

public class StreamWatcher
{
    private readonly Stream stream;

    private byte[] sizeBuffer = new byte[2];

    public StreamWatcher(Stream stream)
    {
        if (stream == null)
            throw new ArgumentNullException("stream");
        this.stream = stream;
        WatchNext();
    }

    protected void OnMessageAvailable(MessageAvailableEventArgs e)
    {
        var handler = MessageAvailable;
        if (handler != null)
            handler(this, e);
    }

    protected void WatchNext()
    {
        stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback),
            null);
    }

    private void ReadCallback(IAsyncResult ar)
    {
        int bytesRead = stream.EndRead(ar);
        if (bytesRead != 2)
            throw new InvalidOperationException("Invalid message header.");
        int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0];
        OnMessageAvailable(new MessageAvailableEventArgs(messageSize));
        WatchNext();
    }

    public event MessageAvailableEventHandler MessageAvailable;
}

Я думаю об этом. Это предполагает, что любой класс, обрабатывающий сообщение, также имеет доступ к Stream и готов прочитать его, синхронно или асинхронно, на основе размера сообщения в событии. Если вы хотите, чтобы класс наблюдателя фактически прочитал все сообщение, вам придется добавить еще несколько кодов для этого.

Ответ 2

Обычный подход заключается в использовании .NET асинхронного шаблона программирования, выставленного Stream. По сути, вы начинаете читать асинхронно, вызывая Stream.BeginRead, передавая ему буфер byte[] и метод обратного вызова, который будет вызываться, когда данные будут считаны из потока. В методе обратного вызова вы вызываете Stream.EndRead, передавая ему аргумент IAsncResult, который был передан вашему обратному вызову. Возвращаемое значение EndRead сообщает, сколько байтов было прочитано в буфере.

Как только вы получили первые несколько байтов таким образом, вы можете дождаться остальной части сообщения (если вы не получили достаточное количество данных в первый раз), снова вызвав BeginRead. Когда вы получите сообщение целиком, вы можете поднять это событие.

Ответ 3

Не использует Stream.BeginRead() так же, как с помощью синхронного метода Stream.Read() в отдельном потоке?

Ответ 4

Да, это можно сделать. Используйте неблокирующий Stream.BeginRead метод с AsyncCallback. Обратный вызов называется асинхронным, когда данные становятся доступными. В обратном вызове вызовите Stream.EndRead, чтобы получить данные, и позвоните Stream.BeginRead, чтобы получить следующий фрагмент данных. Буфер поступает в массив байтов, который достаточно велик для хранения сообщения. Как только массив байтов будет заполнен (может потребоваться несколько обратных вызовов), поднимите событие. Затем прочитайте следующий размер сообщения, создайте новый буфер, повторите, сделайте.