Асинхронный захват от блокировки - программирование

Асинхронный захват от блокировки

Я использую BlockingCollection для реализации шаблона производителя/потребителя. У меня есть асинхронный цикл, который заполняет коллекцию данными, которые будут обработаны, которые затем могут быть доступны клиенту в гораздо более позднее время. Пакеты появляются редко, и я хотел бы, чтобы опрос был выполнен без использования блокирующего вызова.

В сущности, я ищу что-то вроде BeginTake и EndTake, которого нет в блокирующей коллекции, чтобы я мог использовать внутренний пул потоков в обратном вызове. Он не должен быть BlockingCollection любым способом. Все, что делает то, что мне нужно, было бы здорово.

Это то, что у меня есть сейчас. _bufferedPackets является BlockingCollection<byte[]>:

public byte[] Read(int timeout)
{
    byte[] result;
    if (_bufferedPackets.IsCompleted)
    {
        throw new Exception("Out of packets");
    }
    _bufferedPackets.TryTake(out result, timeout);      
    return result;
}

Я хотел бы, чтобы это было что-то вроде этого, в псевдокоде:

public void Read(int timeout)
{
    _bufferedPackets.BeginTake(result =>
        {
            var bytes = _bufferedPackets.EndTake(result);
            // Process the bytes, or the resuting timeout
        }, timeout, _bufferedPackets);
}

Каковы мои варианты для этого? Я не хочу помещать поток any в состояние ожидания, так как для него есть много других компонентов ввода-вывода, и я бы быстро закончил потоки.

Обновление: Я переписал рассматриваемый код, чтобы использовать процесс async по-другому, по существу, заменяя обратные вызовы на основе того, есть ли запрос ожидания в пределах тайм-аута. Это прекрасно работает, но все равно было бы здорово, если бы был способ сделать это, не прибегая к таймерам и заменяя лямбды вокруг, что потенциально вызывает расовые условия и трудно писать (и понимать). Я решил это также с собственной реализацией очереди async, но все равно было бы здорово, если бы был более стандартный и хорошо протестированный вариант.

4b9b3361

Ответ 1

Таким образом, для этого не существует встроенной опции, я вышел и попытался изо всех сил сделать то, что хотел в качестве эксперимента. Оказывается, есть много проблем, чтобы сделать эту работу примерно как других пользователей старого асинхронного шаблона.

public class AsyncQueue<T>
{
    private readonly ConcurrentQueue<T> queue;
    private readonly ConcurrentQueue<DequeueAsyncResult> dequeueQueue; 

    private class DequeueAsyncResult : IAsyncResult
    {
        public bool IsCompleted { get; set; }
        public WaitHandle AsyncWaitHandle { get; set; }
        public object AsyncState { get; set; }
        public bool CompletedSynchronously { get; set; }
        public T Result { get; set; }

        public AsyncCallback Callback { get; set; }
    }

    public AsyncQueue()
    {
        dequeueQueue = new ConcurrentQueue<DequeueAsyncResult>();
        queue = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        DequeueAsyncResult asyncResult;
        while  (dequeueQueue.TryDequeue(out asyncResult))
        {
            if (!asyncResult.IsCompleted)
            {
                asyncResult.IsCompleted = true;
                asyncResult.Result = item;

                ThreadPool.QueueUserWorkItem(state =>
                {
                    if (asyncResult.Callback != null)
                    {
                        asyncResult.Callback(asyncResult);
                    }
                    else
                    {
                        ((EventWaitHandle) asyncResult.AsyncWaitHandle).Set();
                    }
                });
                return;
            }
        }
        queue.Enqueue(item);
    }

    public IAsyncResult BeginDequeue(int timeout, AsyncCallback callback, object state)
    {
        T result;
        if (queue.TryDequeue(out result))
        {
            var dequeueAsyncResult = new DequeueAsyncResult
            {
                IsCompleted = true, 
                AsyncWaitHandle = new EventWaitHandle(true, EventResetMode.ManualReset), 
                AsyncState = state, 
                CompletedSynchronously = true, 
                Result = result
            };
            if (null != callback)
            {
                callback(dequeueAsyncResult);
            }
            return dequeueAsyncResult;
        }

        var pendingResult = new DequeueAsyncResult
        {
            AsyncState = state, 
            IsCompleted = false, 
            AsyncWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset), 
            CompletedSynchronously = false,
            Callback = callback
        };
        dequeueQueue.Enqueue(pendingResult);
        Timer t = null;
        t = new Timer(_ =>
        {
            if (!pendingResult.IsCompleted)
            {
                pendingResult.IsCompleted = true;
                if (null != callback)
                {
                    callback(pendingResult);
                }
                else
                {
                    ((EventWaitHandle)pendingResult.AsyncWaitHandle).Set();
                }
            }
            t.Dispose();
        }, new object(), timeout, Timeout.Infinite);

        return pendingResult;
    }

    public T EndDequeue(IAsyncResult result)
    {
        var dequeueResult = (DequeueAsyncResult) result;
        return dequeueResult.Result;
    }
}

Я не слишком уверен в синхронизации свойства IsComplete, и я не слишком горячий, как только dequeueQueue очищается при последующих вызовах Enqueue. Я не уверен, когда правильное время для сигнатуры команд ожидания, но это лучшее решение, которое у меня до сих пор.

Пожалуйста, не учитывайте этот код качества продукции любыми способами. Я просто хотел показать общий смысл того, как я обернулся, чтобы все нитки вращаются, не ожидая блокировок. Я уверен, что в нем полно всяких краевых случаев и ошибок, но он удовлетворяет требованиям, и я хотел что-то вернуть людям, которые сталкиваются с вопросом.

Ответ 2

Возможно, я не понимаю вашу ситуацию, но не можете ли вы использовать неблокирующую коллекцию?

Я создал этот пример, чтобы проиллюстрировать:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTakeFromBlockingCollection
{
    class Program
    {
        static void Main(string[] args)
        {
            var queue = new ConcurrentQueue<string>();

            var producer1 = Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 10; i += 1)
                {
                    queue.Enqueue("=======");
                    Thread.Sleep(10);
                }
            });

            var producer2 = Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 10; i += 1)
                {
                    queue.Enqueue("*******");
                    Thread.Sleep(3);
                }
            });

            CreateConsumerTask("One  ", 3, queue);
            CreateConsumerTask("Two  ", 4, queue);
            CreateConsumerTask("Three", 7, queue);

            producer1.Wait();
            producer2.Wait();
            Console.WriteLine("  Producers Finished");
            Console.ReadLine();
        }

        static void CreateConsumerTask(string taskName, int sleepTime, ConcurrentQueue<string> queue)
        {
            Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    string result;
                    if (queue.TryDequeue(out result))
                    {
                        Console.WriteLine("  {0} consumed {1}", taskName, result);
                    }
                    Thread.Sleep(sleepTime);
                }
            });
        }
    }
}

Вот выход программы

enter image description here

Я считаю, что BlockingCollection предназначен для переноса параллельной коллекции и обеспечения механизма, позволяющего нескольким потребителям блокировать; ожидая производителей. Это использование противоречит вашим требованиям.

Я нашел эту статью статью о классе BlockingCollection, чтобы быть полезной.

Ответ 3

Я уверен, что BlockingCollection<T> не может этого сделать, вам придется сворачивать самостоятельно. Я придумал это:

class NotifyingCollection<T>
{
    private ConcurrentQueue<Action<T>> _subscribers = new ConcurrentQueue<Action<T>>();
    private ConcurrentQueue<T> _overflow = new ConcurrentQueue<T>();

    private object _lock = new object();

    public void Add(T item)
    {
        _overflow.Enqueue(item);
        Dispatch();
    }

    private void Dispatch()
    {
        // this lock is needed since we need to atomically dequeue from both queues...
        lock (_lock)
        {
            while (_overflow.Count > 0 && _subscribers.Count > 0)
            {
                Action<T> callback;
                T item;

                var r1 = _overflow.TryDequeue(out item);
                var r2 = _subscribers.TryDequeue(out callback);

                Debug.Assert(r1 && r2);
                callback(item);
                // or, optionally so that the caller thread doesn't take too long ...
                Task.Factory.StartNew(() => callback(item));
                // but you'll have to consider how exceptions will be handled.
            }
        }
    }

    public void TakeAsync(Action<T> callback)
    {
        _subscribers.Enqueue(callback);
        Dispatch();
    }
}

Я использовал поток, который вызывает TakeAsync() или Add(), чтобы служить в качестве потока обратного вызова. Когда вы вызываете Add() или TakeAsync(), он будет пытаться отправить все поставленные в очередь элементы в вызываемые вызовы. Таким образом, нет ни одного потока, который просто сидит там, спящий, ожидая, чтобы его оповещали.

Эта блокировка является довольно уродливой, но вы сможете ставить в очередь и подписываться на несколько потоков без блокировки. Я не мог найти способ сделать эквивалент только dequeue, если там что-то доступно в другой очереди, не используя эту блокировку.

Примечание. Я проверил это минимально, используя несколько потоков.