Подтвердить что ты не робот

Реализация очереди блокировки в С#

Я использую приведенный ниже код для реализации и тестирования очереди блокировки. Я тестирую очередь, запустив 5 параллельных потоков (удалителей), чтобы вытащить элементы из очереди, блокируя, если очередь пуста и 1 параллельный поток (сумматор), чтобы добавить элементы в очередь. Однако, если я оставлю его достаточно долго, я получаю исключение, потому что один из потоков удаления выходит из состояния ожидания, даже когда очередь пуста.

Кто-нибудь знает, почему я получаю исключение? Заметьте, мне интересно узнать, почему это не работает в отличие от рабочего решения (как я могу просто Google это).

Я очень благодарен за вашу помощь.

using System;
using System.Threading;
using System.Collections.Generic;

namespace Code
{
    class Queue<T>
    {
        private List<T> q = new List<T>();

        public void Add(T item)
        {
            lock (q)
            {
                q.Add(item);
                if (q.Count == 1)
                {
                    Monitor.Pulse(q);
                }
            }
        }

        public T Remove()
        {
            lock (q)
            {
                if (q.Count == 0)
                {
                    Monitor.Wait(q);
                }
                T item = q[q.Count - 1];
                q.RemoveAt(q.Count - 1);
                return item;
            }
        }
    }

    class Program
    {
        static Random r = new Random();
        static Queue<int> q = new Queue<int>();
        static int count = 1;
        static void Adder()
        {
            while (true)
            {
                Thread.Sleep(1000 * ((r.Next() % 5) + 1));
                Console.WriteLine("Will try to add");
                q.Add(count++);
            }
        }

        static void Remover()
        {
            while (true)
            {
                Thread.Sleep(1000 * ((r.Next() % 5) + 1));
                Console.WriteLine("Will try to remove");
                int item = q.Remove();
                Console.WriteLine("Removed " + item);
            }
        }

        static void Main(string[] args)
        {
            Console.WriteLine("Test");

            for (int i = 0; i < 5; i++)
            {
                Thread remover = new Thread(Remover);
                remover.Start();
            }

            Thread adder = new Thread(Adder);
            adder.Start();
        }
    }
}
4b9b3361

Ответ 1

если я оставлю его достаточно долго, я получаю исключение, потому что один из потоков удаления выходит из состояния ожидания, даже когда очередь пуста. Кто-нибудь знает, почему я получаю исключение?

Вопрос нечетный, потому что, очевидно, вы знаете ответ: ваше первое предложение отвечает на вопрос, заданный вторым предложением. Вы получаете исключение, потому что поток удаления выходит из состояния ожидания, когда очередь пуста.

Чтобы решить проблему, вы хотите использовать цикл вместо "if". Правильный код:

while(q.Count == 0) Monitor.Wait(q);

не

if(q.Count == 0) Monitor.Wait(q);

UPDATE:

Комментарий указывает, что, возможно, ваш вопрос должен был быть "при каких обстоятельствах потребительский поток может получить монитор, когда очередь пуста?"

Хорошо, вы в лучшем положении, чтобы ответить на этот вопрос, чем мы, поскольку вы используете программу и смотрите на выход. Но как раз от моей головы, вот путь, который может произойти:

  • Потребительский поток 1: ожидание
  • Потребительский поток 2: готов
  • Производитель 3: владеет монитором
  • В очереди есть один элемент.
  • Резьба 3 импульса.
  • Тема 1 переходит в состояние готовности.
  • Тема 3 оставляет монитор.
  • Ввод 2 в монитор.
  • Тема 2 потребляет элемент в очереди
  • Тема 2 оставляет монитор.
  • Тема 1 входит в монитор.

И теперь поток 1 находится на мониторе с пустой очередью.

Вообще говоря, рассуждая о подобных проблемах, вы должны думать о "Пульсе" как о голубине с прикрепленной к нему запиской. После его освобождения он не имеет никакого отношения к отправителю, и если он не может найти свой дом, он умирает в пустыне, а его сообщение остается недостижимым. Все, что вы знаете, когда Pulse - это то, что если есть ожидание потока, тогда один поток переместится в состояние готовности в будущем; вы ничего не знаете о относительном времени операций над потоками.

Ответ 2

Ваш код будет работать, если есть 1 потребитель, но когда их больше, этот механизм выходит из строя, и он должен быть while(q.Count == 0) Monitor.Wait(q)

Следующий сценарий показывает, когда if(q.Count == 0) Monitor.Wait(q) будет терпеть неудачу (он отличается от Eric's):

  • потребитель 1 ждет
  • производитель поставил элемент и пульсирует.
  • потребитель 1 готов
  • производитель освобождает блокировку
  • потребитель 2 только что вошел Удалить, повезло и приобретает замок
  • потребитель 2 видит 1 элемент, не ждет и берет элемент
  • потребитель 2 освобождает блокировку
  • потребитель 1 перезаписывает блокировку, но очередь пуста.

Это происходит точно так же, как документация говорит, что это может произойти:

Когда поток, который вызвал импульс, освобождает блокировку, следующий поток в готовой очереди (который не обязательно является импульсом) получает блокировку.

Ответ 3

Эрик, конечно, прав; дело в том, что, хотя код, похоже, охватывает все базы; тот факт, что произошло исключение, показывает, что вы этого не сделали.

Условие гонки заключается в том, что между Monitor.Wait на съемнике и Monitor.Pulse на сумматоре (который освобождает блокировку, но не обязательно сразу же запускает поток, ожидающий пробуждения и повторно приобретает его); последующая удаленная нить может получить блокировку и сразу же перейти на

if (q.Count == 0) 
{ 
  Monitor.Wait(q); 
} 

Заявление и сразу же удалите элемент. Затем поток Pulse d просыпается и предполагает наличие там предмета; но нет.

Способ исправления, независимо от того, как проявляется состояние гонки, - это, как сказал Эрик.

Аналогично, если вы прочтете пример на Monitor.Pulse, вы увидите аналогичную настройку с тем, что вы здесь сделали, но тонким образом сделать это.