Подтвердить что ты не робот

Использование BlockingCollection <>: OperationCanceledException, есть ли лучший способ?

Я использую (откровенно большой) тип BlockingCollection<> для сильно многопоточного высокопроизводительного приложения.

Там много пропускной способности через коллекцию, и на микроуровне она очень эффективна. Тем не менее, для каждой "партии" он всегда будет завершен, помечая токен отмены. Это приводит к тому, что исключение вызывает любой ожидающий вызов Take. Это хорошо, но я бы установил для возвращаемого значения или выходного параметра, чтобы сигнализировать об этом, потому что a) исключения имеют очевидные накладные расходы и b) при отладке я не хочу вручную отключать исключение break-on-exception для этого конкретного исключение.

Реализация кажется интенсивной, и, теоретически, я могу разобрать и воссоздать мою собственную версию, которая не использовала исключения, но, возможно, существует менее сложный способ?

Я мог бы добавить объект null (или если нет, заполнитель) в коллекцию, чтобы обозначить, что процесс должен завершиться, однако также должно быть средство для прекращения красиво, т.е. пробуждение ожидающих потоков и рассказать им как-то что-то продолжалось.

Итак - альтернативные типы коллекций? Воссоздайте свой собственный? Какой-то способ злоупотреблять этим?

(В некотором контексте: я пошел с BlockingCollection<>, потому что у него есть преимущество перед ручным блокированием вокруг Queue. Насколько я могу сказать, использование примитивов для потоковой обработки превосходно и в моем случае - несколько миллисекунд здесь - и - там и оптимальное ядро ​​имеет решающее значение.)

Изменить: я только что открыл щедрость для этого. Я не верю, что ответ Анастасиоляля охватывает вопрос, который я поднимаю в своих комментариях. Я знаю, что это сложная проблема. Кто-нибудь может помочь?

4b9b3361

Ответ 1

Как я уже сказал, вы уже сделали сами, глядя в отраженный источник BlockingCollection, к сожалению, к сожалению, когда CancellationToken передается в BlockingCollection, и он отменяется, вы получите исключение OperationCancelledException, как можно увидеть на изображении ниже (с несколько обходных путей после изображения)

GetConsumingEnumerable вызывает TryTakeWithNoTimeValidation на BlockingCollection, который, в свою очередь, вызывает это исключение.

enter image description here

Обходной путь № 1

Одна потенциальная стратегия будет заключаться в том, что, если у вас будет больше контроля над вашими производителями и вашими потребителями, вместо того, чтобы передавать токен отмены в BlockingCollection (что вызовет это исключение), вы передаете токен отмены своим производителям и вашим потребителям.

Если ваши производители не производят, а ваши потребители не потребляют, вы фактически отменили операцию, не поднимая это исключение, и передав CancellationToken.None в своем BlockingCollection.

Специальные случаи Отмена, когда BlockingCollection находится в BoundedCapacity или Empty

Производители заблокированы. Нити производителя будут заблокированы при достижении BoundedCapacity в BlockingCollection. Следовательно, при попытке отменить и BlockingCollection находится в BoundedCapacity (что означает, что ваши потребители не заблокированы, а производители заблокированы, потому что они не могут добавить какие-либо дополнительные элементы в очередь), тогда вам нужно будет разрешить использование дополнительных предметов (один для каждого потока производителей), который разблокирует производителей (потому что они заблокированы при добавлении к blockingCollection) и, в свою очередь, позволит вам отменить логику отмены на стороне производителя.

Потребители заблокированы. Когда ваши потребители заблокированы, потому что очередь пуста, вы можете вставить пустую единицу работы (по одному для каждого потребительского потока) в коллекции блокировки, чтобы разблокировать пользователя потоки и позволяют логике отмены действовать в сторону потребителя.

Когда в очереди находятся элементы, и никаких ограничений, таких как BoundedCapacity или Empty, не было достигнуто, производители и потребительские потоки не должны блокироваться.

Обходной путь №2

Использование единицы отмены работы.

Когда ваше приложение должно быть отменено, ваши продюсеры (возможно, только один продюсер будет достаточен, а другие просто отменит производство), создаст блок отмены (может быть нулевым, как вы упомянули, или какой-то класс, который реализует интерфейс маркера). Когда потребители потребляют эту единицу работы и обнаруживают, что она фактически является единицей отмены, их логика отмены срабатывает. Количество единиц отмены единицы, подлежащих производству, должно быть равно количеству потребительских потоков.

Опять же, необходимо проявлять осторожность, когда мы близки к BoundedCapacity, поскольку это может быть признаком того, что некоторые из производителей заблокированы. В зависимости от количества производителей/потребителей вы можете потреблять потребитель, пока все производители (но 1) не прекратят работу. Это гарантирует, что вокруг нет задерживающих производителей. Когда останется только один производитель, ваш последний потребитель может отключиться, и продюсер может прекратить производство единиц отмены работы.

Ответ 2

Как насчет BlockingQueue, который я сделал некоторое время назад?

http://apichange.codeplex.com/SourceControl/changeset/view/76c98b8c7311#ApiChange.Api%2fsrc%2fInfrastructure%2fBlockingQueue.cs

Он должен делать все, что угодно, без каких-либо исключений. Текущая очередь просто закрывает событие на диске, которое может быть не таким, каким вы хотите. Вы можете захотеть сделать enque null и подождать, пока все элементы не будут обработаны. Кроме того, это должно соответствовать вашим потребностям.

using System.Collections.Generic;
using System.Collections;
using System.Threading;
using System;

namespace ApiChange.Infrastructure
{

    /// <summary>
    /// A blocking queue which supports end markers to signal that no more work is left by inserting
    /// a null reference. This constrains the queue to reference types only. 
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class BlockingQueue<T> : IEnumerable<T>, IEnumerable, IDisposable where T : class
    {
        /// <summary>
        /// The queue used to store the elements
        /// </summary>
        private Queue<T> myQueue = new Queue<T>();
        bool myAllItemsProcessed = false;
        ManualResetEvent myEmptyEvent = new ManualResetEvent(false);

        /// <summary>
        /// Deques an element from the queue and returns it.
        /// If the queue is empty the thread will block. If the queue is stopped it will immedieately
        /// return with null.
        /// </summary>
        /// <returns>An object of type T</returns>      
        public T Dequeue()
        {
            if (myAllItemsProcessed)
                return null;

            lock (myQueue)
            {
                while (myQueue.Count == 0) 
                {
                    if(!Monitor.Wait(myQueue, 45))
                    {
                        // dispatch any work which is not done yet
                        if( myQueue.Count > 0 )
                            continue;
                    }

                    // finito 
                    if (myAllItemsProcessed)
                    {
                        return null;
                    }
                }

                T result = myQueue.Dequeue();
                if (result == null)
                {
                    myAllItemsProcessed = true;
                    myEmptyEvent.Set();
                }
                return result;
            }
        }

        /// <summary>
        /// Releases the waiters by enqueuing a null reference which causes all waiters to be released. 
        /// The will then get a null reference as queued element to signal that they should terminate.
        /// </summary>
        public void ReleaseWaiters()
        {
            Enqueue(null);
        }

        /// <summary>
        /// Waits the until empty. This does not mean that all items are already process. Only that
        /// the queue contains no more pending work. 
        /// </summary>
        public void WaitUntilEmpty()
        {
            myEmptyEvent.WaitOne();
        }

        /// <summary>
        /// Adds an element of type T to the queue. 
        /// The consumer thread is notified (if waiting)
        /// </summary>
        /// <param name="data_in">An object of type T</param>      
        public void Enqueue(T data_in)
        {
            lock (myQueue)
            {
                myQueue.Enqueue(data_in);
                Monitor.PulseAll(myQueue);
            }
        }

        /// <summary>
        /// Returns an IEnumerator of Type T for this queue
        /// </summary>
        /// <returns></returns>    
        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            while (true)
            {
                T item = Dequeue();
                if (item == null)
                    break;
                else
                    yield return item;
            }
        }

        /// <summary>
        /// Returns a untyped IEnumerator for this queue
        /// </summary>
        /// <returns></returns>     
        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<T>)this).GetEnumerator();
        }


        #region IDisposable Members

        /// <summary>
        /// Closes the EmptyEvent WaitHandle.
        /// </summary>
        public void Dispose()
        {
            myEmptyEvent.Close();
        }

        #endregion
    }
}

Ответ 3

Вы завершите передачу пакета, установив флаг на последнем элементе (добавьте к нему свойство IsLastItem bool или заверните его). Или вы можете отправить нуль в качестве последнего элемента (не уверен, что нуль проходит через blockingcollection правильно).

Если вы можете удалить необходимость в "пакетной" концепции, вы можете создать дополнительный поток, чтобы непрерывно брать() и обрабатывать новые данные из вашего блока блокировки и больше ничего не делать.

Ответ 4

Кирен,

Из моего осмотра я лично не знаю нити типа безопасности для шаблона ProducerConsumer, который делает именно то, что вы хотели. Я не претендую на это как конкурентное решение, но предлагаю вам украсить BlockingCollection<T> несколькими extension method, которые предоставят вам свободу поставлять любые встроенные или настраиваемые типы вместо стандартных CancellationToken.

Этап 1:

Ниже приведен список методов по умолчанию, которые используют метод underling TryAddWithNoTimeValidation для добавления в очередь.

public void Add(T item){
      this.TryAddWithNoTimeValidation(item, -1, new CancellationToken());
}

public void Add(T item, CancellationToken cancellationToken){
      this.TryAddWithNoTimeValidation(item, -1, cancellationToken);
    }

public bool TryAdd(T item){
      return this.TryAddWithNoTimeValidation(item, 0, new CancellationToken());
    }

public bool TryAdd(T item, TimeSpan timeout){
      BlockingCollection<T>.ValidateTimeout(timeout);
      return this.TryAddWithNoTimeValidation(item, (int) timeout.TotalMilliseconds, new CancellationToken());
    }

public bool TryAdd(T item, int millisecondsTimeout){
      BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
      return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, new           CancellationToken());
}

public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken){
 BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
 return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
}

Теперь вы можете предоставить расширение для любого/всего метода, который вам интересен.

Этап 2:

Теперь вы ссылаетесь на свою реализацию TryAddWithNoTimeValidation вместо стандартного.

Я могу дать вам альтернативную версию TryAddWithNoTimeValidation, которая безопасно продолжит без исключения OperationCancellation.