Подтвердить что ты не робот

Каков наилучший способ ожидания нескольких переменных условий в С++ 11?

Сначала немного контекста. Я изучаю потоки в С++ 11 и для этой цели я пытаюсь создать небольшой класс actor, по существу ( Я оставил данные обработки исключений и распространения) так:

class actor {
    private: std::atomic<bool> stop;
    private: std::condition_variable interrupt;
    private: std::thread actor_thread;
    private: message_queue incoming_msgs;

    public: actor() 
    : stop(false), 
      actor_thread([&]{ run_actor(); })
    {}

    public: virtual ~actor() {
        // if the actor is destroyed, we must ensure the thread dies too
        stop = true;
        // to this end, we have to interrupt the actor thread which is most probably
        // waiting on the incoming_msgs queue:
        interrupt.notify_all();
        actor_thread.join();
    }

    private: virtual void run_actor() {
        try {
            while(!stop)
                // wait for new message and process it
                // but interrupt the waiting process if interrupt is signaled:
                process(incoming_msgs.wait_and_pop(interrupt));
        } 
        catch(interrupted_exception) {
            // ...
        }
    };

    private: virtual void process(const message&) = 0;
    // ...
};

Каждый актер работает в своем собственном actor_thread, ждет нового входящего сообщения на incoming_msgs и - когда приходит сообщение - обрабатывает его.

actor_thread создается вместе с actor и должен умереть вместе с ним, поэтому мне нужен какой-то механизм прерывания в message_queue::wait_and_pop(std::condition_variable interrupt).

По сути, я требую, чтобы блоки wait_and_pop до тех пор, пока а) появляется новый message или b) до тех пор, пока не будет запущен interrupt, и в этом случае - в идеале - interrupted_exception.

Прибытие нового сообщения в message_queue в настоящее время смоделировано также с помощью std::condition_variable new_msg_notification:

// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
    std::unique_lock<std::mutex> lock(mutex);

    // How to interrupt the following, when interrupt fires??
    new_msg_notification.wait(lock,[&]{
        return !queue.empty();
    });
    auto msg(std::move(queue.front()));
    queue.pop();
    return msg;
}

Чтобы сократить длинную историю, вопрос заключается в следующем: как мне прерывать ожидание нового сообщения в new_msg_notification.wait(...) при срабатывании interrupt (без введения тайм-аута )?

В качестве альтернативы вопрос может быть прочитан как: Как подождать, пока не будет сигнализирован какой-либо один из двух std::condition_variable?

Один наивный подход, по-видимому, заключается не в том, чтобы использовать std::condition_variable вообще для прерывания, а вместо этого просто использовать атомный флаг std::atomic<bool> interrupted, а затем занят ожидание на new_msg_notification с очень малым тайм-аутом, пока не появится новое сообщение прибыл или до true==interrupted. Тем не менее, я очень хотел бы избежать ожидания.


EDIT:

Из комментариев и ответа от пикрова, похоже, что в принципе возможны два подхода.

  • Задать специальное сообщение "Terminate", предложенное Аланом, мукундой и пикрой. Я решил отказаться от этого варианта, потому что я понятия не имею о размере очереди в то время, когда я хочу, чтобы актер закончил работу. Это может быть очень хорошо (как это обычно бывает, когда я хочу что-то быстро прекратить), что в очереди осталось несколько сообщений, и кажется неприемлемым ждать их обработки до тех пор, пока окончательное сообщение не получит включите.
  • Реализовать пользовательскую версию переменной условия, которая может быть прервана другим потоком путем перенаправления уведомления на переменную условия, в которой ожидает первый поток. Я выбрал этот подход.

Для тех, кто вас интересует, моя реализация идет следующим образом. Переменная условия в моем случае на самом деле является semaphore (потому что мне они нравятся больше и потому, что мне это нравится). Я снабдил этот семафор ассоциированным interrupt, который можно получить из семафора через semaphore::get_interrupt(). Если теперь один поток блокируется в semaphore::wait(), другой поток имеет возможность вызывать semaphore::interrupt::trigger() при прерывании семафора, заставляя первый поток разблокировать и распространять interrupt_exception.

struct
interrupt_exception {};

class
semaphore {
    public: class interrupt;
    private: mutable std::mutex mutex;

    // must be declared after our mutex due to construction order!
    private: interrupt* informed_by;
    private: std::atomic<long> counter;
    private: std::condition_variable cond;

    public: 
    semaphore();

    public: 
    ~semaphore() throw();

    public: void 
    wait();

    public: interrupt&
    get_interrupt() const { return *informed_by; }

    public: void
    post() {
        std::lock_guard<std::mutex> lock(mutex);
        counter++;
        cond.notify_one(); // never throws
    }

    public: unsigned long
    load () const {
        return counter.load();
    }
};

class
semaphore::interrupt {
    private: semaphore *forward_posts_to;
    private: std::atomic<bool> triggered;

    public:
    interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
        assert(forward_posts_to);
        std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
        forward_posts_to->informed_by = this;
    }

    public: void
    trigger() {
        assert(forward_posts_to);
        std::lock_guard<std::mutex>(forward_posts_to->mutex);

        triggered = true;
        forward_posts_to->cond.notify_one(); // never throws
    }

    public: bool
    is_triggered () const throw() {
        return triggered.load();
    }

    public: void
    reset () throw() {
        return triggered.store(false);
    }
};

semaphore::semaphore()  : counter(0L), informed_by(new interrupt(this)) {}

// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw()  {
    delete informed_by;
}

void
semaphore::wait() {
    std::unique_lock<std::mutex> lock(mutex);
    if(0L==counter) {
        cond.wait(lock,[&]{
            if(informed_by->is_triggered())
                throw interrupt_exception();
            return counter>0;
        });
    }
    counter--;
}

Используя этот semaphore, моя реализация очереди сообщений теперь выглядит так (используя семафор вместо std::condition_variable, я мог бы избавиться от std::mutex:

class
message_queue {    
    private: std::queue<message> queue;
    private: semaphore new_msg_notification;

    public: void
    push(message&& msg) {
        queue.push(std::move(msg));
        new_msg_notification.post();
    }

    public: const message
    wait_and_pop() {
        new_msg_notification.wait();
        auto msg(std::move(queue.front()));
        queue.pop();
        return msg;
    }

    public: semaphore::interrupt&
    get_interrupt() const { return new_msg_notification.get_interrupt(); }
};

My actor теперь может прерывать поток с очень низкой задержкой в ​​потоке. Реализация в настоящее время выглядит следующим образом:

class
actor {
    private: message_queue
    incoming_msgs;

    /// must be declared after incoming_msgs due to construction order!
    private: semaphore::interrupt&
    interrupt;

    private: std::thread
    my_thread;

    private: std::exception_ptr
    exception;

    public:
    actor()
    : interrupt(incoming_msgs.get_interrupt()), my_thread(
        [&]{
            try {
                run_actor();
            }
            catch(...) {
                exception = std::current_exception();
            }
        })
    {}

    private: virtual void
    run_actor() {
        while(!interrupt.is_triggered())
            process(incoming_msgs.wait_and_pop());
    };

    private: virtual void
    process(const message&) = 0;

    public: void
    notify(message&& msg_in) {
        incoming_msgs.push(std::forward<message>(msg_in));
    }

    public: virtual
    ~actor() throw (interrupt_exception) {
        interrupt.trigger();
        my_thread.join();
        if(exception)
            std::rethrow_exception(exception);
    }
};

Я также добавил полный рабочий пример в http://goo.gl/2GDAa1

4b9b3361

Ответ 1

Вы спрашиваете,

Каков наилучший способ ожидания нескольких переменных условий в С++ 11?

Вы не можете и должны перепроектировать. Один поток может ожидать только одну переменную условия (и связанный с ней мьютекс) за раз. В этом отношении возможности Windows для синхронизации довольно богаче, чем у семейств примитивов синхронизации типа POSIX.

Типичный подход с потокобезопасными очередями заключается в установке специального "все сделано!". сообщение или создать "прерывистую" (или "выключенную" ) очередь. В последнем случае переменная внутреннего условия очереди затем защищает сложный предикат: либо элемент доступен, либо очередь была сломана.

В комментарии вы заметили, что

a notify_all() не будет иметь никакого эффекта, если никто не ждет

Это правда, но, вероятно, не актуально. wait() Включение переменной условия также подразумевает проверку предиката и проверку его перед фактической блокировкой для уведомления. Таким образом, рабочий поток, занятый обработкой элемента очереди, который "пропускает" notify_all(), увидит, в следующий раз, когда он проверяет условие очереди, что предикат (новый элемент доступен или, вся очередь завершена) изменилась.

Ответ 2

Недавно я решил эту проблему с помощью переменной одного условия и отдельной логической переменной для каждого производителя/работника. Предикат в функции ожидания в потоке потребителей может проверять эти флаги и принимать решение, которое производитель/работник выполнил условие.

Ответ 3

Возможно, это может сработать:

избавиться от прерывания.

 message wait_and_pop(std::condition_variable& interrupt) {
    std::unique_lock<std::mutex> lock(mutex);
    {
        new_msg_notification.wait(lock,[&]{
            return !queue.empty() || stop;
        });

        if( !stop )
        {
            auto msg(std::move(queue.front()));
            queue.pop();
            return msg;
        }
        else
        {
            return NULL; //or some 'terminate' message
        }
}

В деструкторе замените interrupt.notify_all() на new_msg_notification.notify_all()