Подтвердить что ты не робот

Остановка С++ 11 std:: threads, ожидающие на std:: condition_variable

Я пытаюсь понять основные механизмы многопоточности в новом стандарте С++ 11. Самый простой пример, который я могу придумать, следующий:

  • Производитель и потребитель реализуются в отдельных потоках.
  • Производитель помещает определенное количество элементов внутри очереди
  • Потребитель берет элементы из очереди, если есть какие-либо существующие

Этот пример также используется во многих школьных книгах о многопоточности, и все, что касается процесса коммуникации, отлично работает. Однако у меня есть проблема, когда дело доходит до остановки потребительского потока.

Я хочу, чтобы потребитель работал до тех пор, пока он не получил явный сигнал остановки (в большинстве случаев это означает, что я жду, пока продюсер закончит, чтобы я мог остановить пользователя до завершения программы). К сожалению, в потоках С++ 11 отсутствует механизм прерывания (который я знаю из многопоточности в Java, например). Таким образом, я должен использовать такие флаги, как isRunning, чтобы сигнализировать, что я хочу остановить поток.

Основная проблема заключается в следующем: после того как я остановил поток производителя, очередь пуста и потребитель ждет на condition_variable, чтобы получить сигнал, когда очередь будет заполнена снова. Поэтому мне нужно разбудить поток, вызвав notify_all() в переменной перед выходом.

Я нашел рабочее решение, но оно кажется каким-то грязным. Ниже приведен пример кода (извините, но каким-то образом я не смог уменьшить размер кода для любого минимального минимального примера):

Класс Queue:

class Queue{
public:
    Queue() : m_isProgramStopped{ false } { }

    void push(int i){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_q.push(i);
        m_cond.notify_one();
    }

    int pop(){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_cond.wait(lock, [&](){ return !m_q.empty() || m_isProgramStopped; });

        if (m_isProgramStopped){
            throw std::exception("Program stopped!");
        }

        int x = m_q.front();
        m_q.pop();

        std::cout << "Thread " << std::this_thread::get_id() << " popped " << x << "." << std::endl;
        return x;
    }

    void stop(){
        m_isProgramStopped = true;
        m_cond.notify_all();
    }

private:
    std::queue<int> m_q;
    std::mutex m_mtx;
    std::condition_variable m_cond;
    bool m_isProgramStopped;
};

Продюсер:

class Producer{
public:
    Producer(Queue & q) : m_q{ q }, m_counter{ 1 } { }

    void produce(){
        for (int i = 0; i < 5; i++){
            m_q.push(m_counter++);
            std::this_thread::sleep_for(std::chrono::milliseconds{ 500 });
        }
    }

    void execute(){
        m_t = std::thread(&Producer::produce, this);
    }

    void join(){
        m_t.join();
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_counter;
};

Потребитель:

class Consumer{
public:
    Consumer(Queue & q) : m_q{ q }, m_takeCounter{ 0 }, m_isRunning{ true }
    { }

    ~Consumer(){
        std::cout << "KILL CONSUMER! - TOOK: " << m_takeCounter << "." << std::endl;
    }

    void consume(){
        while (m_isRunning){
            try{
                m_q.pop();
                m_takeCounter++;
            }
            catch (std::exception e){
                std::cout << "Program was stopped while waiting." << std::endl;
            }
        }
    }

    void execute(){
        m_t = std::thread(&Consumer::consume, this);
    }

    void join(){
        m_t.join();
    }

    void stop(){
        m_isRunning = false;
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_takeCounter;
    bool m_isRunning;
};

И, наконец, main():

int main(void){
    Queue q;

    Consumer cons{ q };
    Producer prod{ q };

    cons.execute();
    prod.execute();

    prod.join();

    cons.stop();
    q.stop();

    cons.join();

    std::cout << "END" << std::endl;

    return EXIT_SUCCESS;
}

Правильно ли это закончить поток, ожидающий переменную условия, или есть лучшие методы? В настоящее время очередь должна знать, остановлена ​​ли программа (которая, на мой взгляд, уничтожает свободную связь компонентов), и мне нужно вызвать stop() в очереди явно, что кажется неправильным.

Дополнительно, переменная условия, которая должна использоваться только как сингл, если очередь пуста, теперь означает другое условие - если программа закончилась. Если я не ошибаюсь, каждый раз, когда поток ожидает переменную условия для какого-либо события, он также должен будет проверить, нужно ли остановить поток, прежде чем продолжить его выполнение (что также кажется неправильным).

Есть ли у меня эти проблемы, потому что вся моя конструкция неисправна или мне не хватает некоторых механизмов, которые можно использовать для чистого потока потоков?

4b9b3361

Ответ 1

Нет, нет ничего плохого в вашем дизайне, и это нормальный подход, применяемый к этой проблеме.

Совершенно верно для вас иметь несколько условий (например, что-либо в очереди или остановке программы), привязанных к переменной условия. Главное, что биты в условии проверяются, когда возвращается wait.

Вместо того, чтобы иметь флаг в Queue, чтобы указать, что программа останавливается, вы должны думать о том, что флаг "я могу принять". Это лучшая общая парадигма и работает лучше в многопоточной среде.

Кроме того, вместо pop вызывать исключение, если кто-то его вызывает и stop был вызван, вы можете заменить метод bool try_pop(int &value), который вернет true, если значение было возвращено, иначе false, Таким образом, вызывающий может проверить, не удалось ли остановить очередь (добавьте метод bool is_stopped() const). Хотя обработка исключений работает здесь, она немного тяжелая, и на самом деле это не исключительный случай в многопоточной программе.

Ответ 2

wait можно вызвать с таймаутом. Элемент управления возвращается в поток, и stop можно проверить. В зависимости от этого значения он может wait на большее количество предметов, которые будут потребляться или закончить выполнение. Хорошим введением в многопоточность с С++ является С++ 11 Concurrency.