Сначала немного контекста. Я изучаю потоки в С++ 11 и для этой цели я пытаюсь создать небольшой класс actor
, по существу ( Я оставил данные обработки исключений и распространения) так:
class actor {
private: std::atomic<bool> stop;
private: std::condition_variable interrupt;
private: std::thread actor_thread;
private: message_queue incoming_msgs;
public: actor()
: stop(false),
actor_thread([&]{ run_actor(); })
{}
public: virtual ~actor() {
// if the actor is destroyed, we must ensure the thread dies too
stop = true;
// to this end, we have to interrupt the actor thread which is most probably
// waiting on the incoming_msgs queue:
interrupt.notify_all();
actor_thread.join();
}
private: virtual void run_actor() {
try {
while(!stop)
// wait for new message and process it
// but interrupt the waiting process if interrupt is signaled:
process(incoming_msgs.wait_and_pop(interrupt));
}
catch(interrupted_exception) {
// ...
}
};
private: virtual void process(const message&) = 0;
// ...
};
Каждый актер работает в своем собственном actor_thread
, ждет нового входящего сообщения на incoming_msgs
и - когда приходит сообщение - обрабатывает его.
actor_thread
создается вместе с actor
и должен умереть вместе с ним, поэтому мне нужен какой-то механизм прерывания в message_queue::wait_and_pop(std::condition_variable interrupt)
.
По сути, я требую, чтобы блоки wait_and_pop
до тех пор, пока
а) появляется новый message
или
b) до тех пор, пока не будет запущен interrupt
, и в этом случае - в идеале - interrupted_exception
.
Прибытие нового сообщения в message_queue
в настоящее время смоделировано также с помощью std::condition_variable new_msg_notification
:
// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
std::unique_lock<std::mutex> lock(mutex);
// How to interrupt the following, when interrupt fires??
new_msg_notification.wait(lock,[&]{
return !queue.empty();
});
auto msg(std::move(queue.front()));
queue.pop();
return msg;
}
Чтобы сократить длинную историю, вопрос заключается в следующем: как мне прерывать ожидание нового сообщения в new_msg_notification.wait(...)
при срабатывании interrupt
(без введения тайм-аута )?
В качестве альтернативы вопрос может быть прочитан как: Как подождать, пока не будет сигнализирован какой-либо один из двух std::condition_variable
?
Один наивный подход, по-видимому, заключается не в том, чтобы использовать std::condition_variable
вообще для прерывания, а вместо этого просто использовать атомный флаг std::atomic<bool> interrupted
, а затем занят ожидание на new_msg_notification
с очень малым тайм-аутом, пока не появится новое сообщение прибыл или до true==interrupted
. Тем не менее, я очень хотел бы избежать ожидания.
EDIT:
Из комментариев и ответа от пикрова, похоже, что в принципе возможны два подхода.
- Задать специальное сообщение "Terminate", предложенное Аланом, мукундой и пикрой. Я решил отказаться от этого варианта, потому что я понятия не имею о размере очереди в то время, когда я хочу, чтобы актер закончил работу. Это может быть очень хорошо (как это обычно бывает, когда я хочу что-то быстро прекратить), что в очереди осталось несколько сообщений, и кажется неприемлемым ждать их обработки до тех пор, пока окончательное сообщение не получит включите.
- Реализовать пользовательскую версию переменной условия, которая может быть прервана другим потоком путем перенаправления уведомления на переменную условия, в которой ожидает первый поток. Я выбрал этот подход.
Для тех, кто вас интересует, моя реализация идет следующим образом. Переменная условия в моем случае на самом деле является semaphore
(потому что мне они нравятся больше и потому, что мне это нравится). Я снабдил этот семафор ассоциированным interrupt
, который можно получить из семафора через semaphore::get_interrupt()
. Если теперь один поток блокируется в semaphore::wait()
, другой поток имеет возможность вызывать semaphore::interrupt::trigger()
при прерывании семафора, заставляя первый поток разблокировать и распространять interrupt_exception
.
struct
interrupt_exception {};
class
semaphore {
public: class interrupt;
private: mutable std::mutex mutex;
// must be declared after our mutex due to construction order!
private: interrupt* informed_by;
private: std::atomic<long> counter;
private: std::condition_variable cond;
public:
semaphore();
public:
~semaphore() throw();
public: void
wait();
public: interrupt&
get_interrupt() const { return *informed_by; }
public: void
post() {
std::lock_guard<std::mutex> lock(mutex);
counter++;
cond.notify_one(); // never throws
}
public: unsigned long
load () const {
return counter.load();
}
};
class
semaphore::interrupt {
private: semaphore *forward_posts_to;
private: std::atomic<bool> triggered;
public:
interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
assert(forward_posts_to);
std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
forward_posts_to->informed_by = this;
}
public: void
trigger() {
assert(forward_posts_to);
std::lock_guard<std::mutex>(forward_posts_to->mutex);
triggered = true;
forward_posts_to->cond.notify_one(); // never throws
}
public: bool
is_triggered () const throw() {
return triggered.load();
}
public: void
reset () throw() {
return triggered.store(false);
}
};
semaphore::semaphore() : counter(0L), informed_by(new interrupt(this)) {}
// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw() {
delete informed_by;
}
void
semaphore::wait() {
std::unique_lock<std::mutex> lock(mutex);
if(0L==counter) {
cond.wait(lock,[&]{
if(informed_by->is_triggered())
throw interrupt_exception();
return counter>0;
});
}
counter--;
}
Используя этот semaphore
, моя реализация очереди сообщений теперь выглядит так (используя семафор вместо std::condition_variable
, я мог бы избавиться от std::mutex
:
class
message_queue {
private: std::queue<message> queue;
private: semaphore new_msg_notification;
public: void
push(message&& msg) {
queue.push(std::move(msg));
new_msg_notification.post();
}
public: const message
wait_and_pop() {
new_msg_notification.wait();
auto msg(std::move(queue.front()));
queue.pop();
return msg;
}
public: semaphore::interrupt&
get_interrupt() const { return new_msg_notification.get_interrupt(); }
};
My actor
теперь может прерывать поток с очень низкой задержкой в потоке. Реализация в настоящее время выглядит следующим образом:
class
actor {
private: message_queue
incoming_msgs;
/// must be declared after incoming_msgs due to construction order!
private: semaphore::interrupt&
interrupt;
private: std::thread
my_thread;
private: std::exception_ptr
exception;
public:
actor()
: interrupt(incoming_msgs.get_interrupt()), my_thread(
[&]{
try {
run_actor();
}
catch(...) {
exception = std::current_exception();
}
})
{}
private: virtual void
run_actor() {
while(!interrupt.is_triggered())
process(incoming_msgs.wait_and_pop());
};
private: virtual void
process(const message&) = 0;
public: void
notify(message&& msg_in) {
incoming_msgs.push(std::forward<message>(msg_in));
}
public: virtual
~actor() throw (interrupt_exception) {
interrupt.trigger();
my_thread.join();
if(exception)
std::rethrow_exception(exception);
}
};
Я также добавил полный рабочий пример в http://goo.gl/2GDAa1