У меня есть приложение в $work, где мне нужно перемещаться между двумя потоками реального времени, которые запланированы на разных частотах. (Фактическое планирование не под моим контролем.) Приложение жестко работает в режиме реального времени (один из потоков должен управлять аппаратным интерфейсом), поэтому передача данных между потоками должна быть заблокирована и без ожидания насколько возможно.
Важно отметить, что нужно переносить только один блок данных: поскольку два потока работают с разной скоростью, будут выполняться два итерации более быстрого потока между двумя пробуждениями более медленного потока; в этом случае нормально перезаписывать данные в буфере записи, чтобы более медленный поток получал только последние данные.
Другими словами, вместо очереди достаточно двойного буферизованного решения. Два буфера выделяются во время инициализации, а потоки чтения и записи могут вызывать методы класса для получения указателей на один из этих буферов.
Код С++:
#include <mutex>
template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() {
m_write_busy = false;
m_read_idx = m_write_idx = 0;
}
~ProducerConsumerDoubleBuffer() { }
// The writer thread using this class must call
// start_writing() at the start of its iteration
// before doing anything else to get the pointer
// to the current write buffer.
T * start_writing(void) {
std::lock_guard<std::mutex> lock(m_mutex);
m_write_busy = true;
m_write_idx = 1 - m_read_idx;
return &m_buf[m_write_idx];
}
// The writer thread must call end_writing()
// as the last thing it does
// to release the write busy flag.
void end_writing(void) {
std::lock_guard<std::mutex> lock(m_mutex);
m_write_busy = false;
}
// The reader thread must call start_reading()
// at the start of its iteration to get the pointer
// to the current read buffer.
// If the write thread is not active at this time,
// the read buffer pointer will be set to the
// (previous) write buffer - so the reader gets the latest data.
// If the write buffer is busy, the read pointer is not changed.
// In this case the read buffer may contain stale data,
// it is up to the user to deal with this case.
T * start_reading(void) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_write_busy) {
m_read_idx = m_write_idx;
}
return &m_buf[m_read_idx];
}
// The reader thread must call end_reading()
// at the end of its iteration.
void end_reading(void) {
std::lock_guard<std::mutex> lock(m_mutex);
m_read_idx = m_write_idx;
}
private:
T m_buf[2];
bool m_write_busy;
unsigned int m_read_idx, m_write_idx;
std::mutex m_mutex;
};
Чтобы избежать устаревших данных в потоке считывателя, структура полезной нагрузки проверяется версией. Чтобы облегчить двунаправленную передачу данных между потоками, используются два экземпляра вышеупомянутого монстра в противоположных направлениях.
Вопросы:
- Является ли эта схема потокобезопасной? Если он сломан, где?
- Можно ли это сделать без мьютекса? Возможно, с помощью только барьеров памяти или инструкций CAS?
- Можно ли сделать лучше?