Подтвердить что ты не робот

Использование std:: mutex, std:: condition_variable и std:: unique_lock

У меня возникли проблемы с пониманием переменных состояния и их использованием с помощью мьютексов, я надеюсь, что сообщество может мне помочь. Обратите внимание: я пришел из фона win32, поэтому я использую CRITICAL_SECTION, HANDLE, SetEvent, WaitForMultipleObject и т.д.

Здесь моя первая попытка в concurrency с использованием стандартной библиотеки С++ 11, это модифицированная версия примера найденного здесь.

#include <condition_variable>
#include <mutex>
#include <algorithm>
#include <thread>
#include <queue>
#include <chrono>
#include <iostream>


int _tmain(int argc, _TCHAR* argv[])
{   
    std::queue<unsigned int>    nNumbers;

    std::mutex                  mtxQueue;
    std::condition_variable     cvQueue;
    bool                        m_bQueueLocked = false;

    std::mutex                  mtxQuit;
    std::condition_variable     cvQuit;
    bool                        m_bQuit = false;


    std::thread thrQuit(
        [&]()
        {
            using namespace std;            

            this_thread::sleep_for(chrono::seconds(7));

            // set event by setting the bool variable to true
            // then notifying via the condition variable
            m_bQuit = true;
            cvQuit.notify_all();
        }
    );

    std::thread thrProducer(
        [&]()
        {           
            using namespace std;

            int nNum = 0;
            unique_lock<mutex> lock( mtxQuit );

            while( ( ! m_bQuit ) && 
                   ( cvQuit.wait_for( lock, chrono::milliseconds(10) ) == cv_status::timeout ) )
            {
                nNum ++;

                unique_lock<mutex> qLock(mtxQueue);
                cout << "Produced: " << nNum << "\n";
                nNumbers.push( nNum );              
            }
        }
    );

    std::thread thrConsumer(
        [&]()
        {
            using namespace std;            

            unique_lock<mutex> lock( mtxQuit );

            while( ( ! m_bQuit ) && 
                    ( cvQuit.wait_for( lock, chrono::milliseconds(10) ) == cv_status::timeout ) )
            {
                unique_lock<mutex> qLock(mtxQueue);
                if( nNumbers.size() > 0 )
                {
                    cout << "Consumed: " << nNumbers.front() << "\n";
                    nNumbers.pop();
                }               
            }
        }
    );

    thrQuit.join();
    thrProducer.join();
    thrConsumer.join();

    return 0;
}

A Несколько вопросов об этом.

Я читал, что "любой поток, который намеревается ждать на std:: condition_variable, должен сначала получить std:: unique_lock."

Таким образом, у меня есть {quit mutex, condition variable и bool}, чтобы указать, когда было отправлено сообщение quit. Потребители и потребители должны каждый получить std:: unique_lock так:

std::unique_lock<std::mutex> lock(m_mtxQuit);

Это меня сбивает с толку. Разве это не блокирует мьютекс quit в первом потоке, тем самым блокируя второй? И если это правда, то как первый поток освобождает блокировку, чтобы начать другой поток?

Другой вопрос: если я изменил вызов wait_for(), чтобы подождать нулевые секунды, этот поток будет голоден. Может кто-нибудь объяснить? Я ожидаю, что он не будет блокироваться перед выполнением цикла while (правильно ли я предполагаю, что no_timeout recv'd вместо таймаута?).

Как я могу вызвать wait_for() и указать нулевое время, так что вызов wait_for() не блокируется, вместо этого он просто проверяет условие и продолжает?

Мне также будет интересно услышать о хороших ссылках на эту тему.

4b9b3361

Ответ 1

Не будет ли это блокирование мьютекса quit в первом потоке, тем самым блокируя второй?

Да.

И если это правда, то как первый поток освобождает блокировку, чтобы начать другой поток?

Когда вы ждете на condition_variable, он открывает блокировку, которую вы передаете, поэтому в

cvQuit.wait_for( lock, chrono::milliseconds(10) )

переменная условия вызовет lock.unlock(), а затем заблокирует до 10 мс (это происходит атомарно, поэтому нет окна между разблокировкой мьютекса и блокировкой, где условие может стать готовым, и вы пропустите его)

Когда мьютекс разблокирован, он позволяет другому потоку получить блокировку на нем.

Другой вопрос: если я изменил вызов wait_for(), чтобы подождать нулевые секунды, этот поток будет голоден. Может кто-нибудь объяснить?

Я ожидаю, что другой поток будет голоден, потому что мьютекс не разблокирован достаточно долго, чтобы другой поток заблокировал его.

Я правильно предполагаю, что no_timeout recv'd вместо таймаута?

Нет, если продолжительность времени проходит без условия готовности, она "истекает" даже после нулевого секунд.

Как я могу вызвать wait_for() и указать нулевое время, так что вызов wait_for() не блокируется, вместо этого он просто проверяет условие и продолжает?

Не используйте переменную условия! Если вы не хотите ждать, пока условие станет истинным, не ждите переменную условия! Просто проверьте m_bQuit и продолжайте. (Кроме того, почему ваши логические имена называются m_bXxx? Они не являются членами, поэтому префикс m_ вводит в заблуждение, а префикс b выглядит как эта ужасная привычка MS к венгерской нотации... которая воняет.)

Мне также будет интересно услышать о хороших ссылках на эту тему.

Лучшая ссылка: Anthony Williams С++ Concurrency В действии, который подробно описывает все атомические и потоковые библиотеки С++ 11, так как а также общие принципы многопоточного программирования. Одной из моих любимых книг на эту тему является Butenhof Программирование с потоками POSIX, что характерно для Pthreads, но объекты С++ 11 очень тесно связаны для Pthreads, поэтому легко переносить информацию из этой книги в многопоточность С++ 11.

N.B. В thrQuit вы пишете на m_bQuit, не защищая его мьютексом, поскольку ничто не мешает другому потоку, читающему его одновременно с записью, это условие гонки, т.е. Поведение undefined. Запись в bool должна быть либо защищена мьютексом, либо должна быть атомным типом, например. std::atomic<bool>

Я не думаю, что вам нужны два мьютекса, это просто добавляет разногласия. Поскольку вы никогда не выпускаете mtxQuit, но, ожидая на condition_variable, нет точки, имеющей второй мьютекс, mtxQuit уже гарантирует, что только один поток может сразу войти в критический раздел.

Ответ 2

Если вы хотите что-то проверить и продолжить, независимо от того, является ли оно истинным или нет (возможно, делает две разные вещи), тогда переменная условия используется неправильно. Переменная состояния - примитив низкого уровня для некоторого условия, связанного с заблокированной структурой данных, которую вы хотите ждать, не имея необходимости отжимать и освобождать блокировку. Канонический пример - очередь - у вас есть блокировка, защищающая доступ к очереди и два состояния vars (очередь не пустая и очередь не заполнена). Чтобы подтолкнуть что-то в очереди, вы получите блокировку, убедитесь, что она не заполнена, подождите, пока она не заполнена, если она есть, нажмите значение в очереди, сообщите не пустое состояние (поскольку оно больше не пусто) и отпустите замок. Поп-операция похожа.

Итак, в вашем случае у вас есть простая очередь, которая не может быть полной, поэтому вам нужен один замок и один condvar для него. Делает совершенный смысл. Но тогда у вас есть флаг "quit", который вы хотите запустить с триггером. Вы не хотите дожидаться, когда будет установлен флаг закрытия, - вы хотите действительно работать до тех пор, пока он не будет установлен - так что condvar действительно не имеет смысла здесь. Да, вы МОЖЕТЕ придумать сложную схему, которая заставит ее работать, но это будет путать, поскольку она не использует переменную условия как переменную условия.

Это имеет смысл (и яснее) просто использовать std::atomic<bool> для флага quit. Затем вы просто инициализируете его значением false, устанавливаете в true в свой поток quit и проверяете его в других потоках.