Недавно я сделал порт для С++ 11, используя std:: atomic тройного буфера, который будет использоваться в качестве механизма синхронизации concurrency. Идея этого подхода синхронизации потоков заключается в том, что для ситуации с производителем-потребителем, когда у вас есть производитель, который работает быстрее, чем потребитель, тройная буферизация может дать некоторые преимущества, поскольку поток производителя не будет "замедлен", если придется ждать для потребителя. В моем случае у меня есть физический поток, который обновляется со скоростью ~ 120 кадров в секунду и поток рендеринга, который работает на скорости ~ 60 кадров в секунду. Очевидно, что я хочу, чтобы поток рендеринга всегда получал самое последнее состояние, но я также знаю, что я буду пропускать много кадров из физического потока из-за разницы в ставках. С другой стороны, я хочу, чтобы мой физический поток поддерживал постоянную скорость обновления и не ограничивался медленным потоком рендеринга, блокирующим мои данные.
Оригинальный код C был сделан remis-мыслями, и полное объяснение находится в его blog. Я призываю всех, кто интересуется его чтением, для дальнейшего понимания первоначальной реализации.
Моя реализация может быть найдена здесь.
Основная идея состоит в том, чтобы иметь массив с тремя позициями (буферами) и атомным флагом, который сравнивается и заменяется, чтобы определить, какие элементы массива соответствуют какому состоянию в любой момент времени. Таким образом, только одна атомная переменная используется для моделирования всех 3 индексов массива и логики тройной буферизации. Буферные 3 позиции называются Dirty, Clean и Snap. Производитель всегда пишет индекс Dirty и может перевернуть запись, чтобы поменять местами Dirty с текущим индексом Clean. Потребитель может запросить новую привязку, которая свопирует текущий индекс Snap с индексом Clean, чтобы получить самый последний буфер. Пользователь всегда считывает буфер в позиции привязки.
Флаг состоит из 8-битного без знака int, а биты соответствуют:
(не используется) (новая запись) (2x грязная) (2x чистая) (2x привязка)
Дополнительный бит бит newWrite устанавливается автором и очищается читателем. Читатель может использовать это, чтобы проверить, были ли какие-либо записи с момента последней привязки, и если нет, это не займет еще одну привязку. Флаг и индексы могут быть получены с помощью простых побитовых операций.
Теперь для кода:
template <typename T>
class TripleBuffer
{
public:
TripleBuffer<T>();
TripleBuffer<T>(const T& init);
// non-copyable behavior
TripleBuffer<T>(const TripleBuffer<T>&) = delete;
TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;
T snap() const; // get the current snap to read
void write(const T newT); // write a new value
bool newSnap(); // swap to the latest value, if any
void flipWriter(); // flip writer positions dirty / clean
T readLast(); // wrapper to read the last available element (newSnap + snap)
void update(T newT); // wrapper to update with a new element (write + flipWriter)
private:
bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes
// 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
// newWrite = (flags & 0x40)
// dirtyIndex = (flags & 0x30) >> 4
// cleanIndex = (flags & 0xC) >> 2
// snapIndex = (flags & 0x3)
mutable atomic_uint_fast8_t flags;
T buffer[3];
};
реализация:
template <typename T>
TripleBuffer<T>::TripleBuffer(){
T dummy = T();
buffer[0] = dummy;
buffer[1] = dummy;
buffer[2] = dummy;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){
buffer[0] = init;
buffer[1] = init;
buffer[2] = init;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
T TripleBuffer<T>::snap() const{
return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}
template <typename T>
void TripleBuffer<T>::write(const T newT){
buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}
template <typename T>
bool TripleBuffer<T>::newSnap(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do {
if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
return false;
} while(!flags.compare_exchange_weak(flagsNow,
swapSnapWithClean(flagsNow),
memory_order_release,
memory_order_consume));
return true;
}
template <typename T>
void TripleBuffer<T>::flipWriter(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow,
newWriteSwapCleanWithDirty(flagsNow),
memory_order_release,
memory_order_consume));
}
template <typename T>
T TripleBuffer<T>::readLast(){
newSnap(); // get most recent value
return snap(); // return it
}
template <typename T>
void TripleBuffer<T>::update(T newT){
write(newT); // write new value
flipWriter(); // change dirty/clean buffer positions for the next update
}
template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
// check if the newWrite bit is 1
return ((flags & 0x40) != 0);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
// swap snap with clean
return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
// set newWrite bit to 1 and swap clean with dirty
return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}
Как вы можете видеть, я решил использовать шаблон Release-Consume для упорядочения памяти. Релиз (memory_order_release) для хранилища гарантирует, что никакие записи в текущем потоке не могут быть переупорядочены после магазина. С другой стороны, Consume гарантирует, что никакие чтения в текущем потоке, зависящие от загружаемого значения, могут быть переупорядочены до этой нагрузки. Это гарантирует, что записи в зависимые переменные в других потоках, выпускающих одну и ту же атомную переменную, будут видны в текущем потоке.
Если мое понимание правильное, так как мне нужны только те атомы, которые нужно установить атомом, операции с другими переменными, которые не влияют непосредственно на флаги, могут свободно переупорядочиваться компилятором, что позволяет увеличить оптимизацию. Из чтения некоторых документов по новой модели памяти я также знаю, что эти расслабленные атомы будут иметь заметное влияние только на такие платформы, как ARM и POWER (они были введены главным образом из-за них). Поскольку я нацелен на ARM, я считаю, что я мог бы извлечь выгоду из этих операций и уметь выжать немного больше производительности.
Теперь на вопрос:
Правильно ли я использую расслабленное упорядочение Release-Consume для этой конкретной проблемы?
Спасибо,
Андре
PS: Извините за длинный пост, но я полагал, что для лучшего обзора проблемы нужен какой-то достойный контекст.
РЕДАКТИРОВАТЬ: Реализованные предложения @Yakk:
- Исправлено
flags
чтение наnewSnap()
иflipWriter()
, которые использовали прямое назначение, поэтому использовали по умолчаниюload(std::memory_order_seq_cst)
. - Для ясности перемещены операции с чередованием бит на выделенные функции.
- Добавлен возвращаемый тип
bool
вnewSnap()
, теперь возвращает false, если в противном случае ничего нового и не было. - Определенный класс как не скопируемый с помощью
= delete
idiom, поскольку оба конструктора копирования и присваивания были небезопасными, если использовалсяTripleBuffer
.
ИЗМЕНИТЬ 2:Исправлено описание, которое было неверным (спасибо @Useless). Покупатель запрашивает новую привязку и считывает из индекса Snap (а не "писатель" ). Извините за отвлечение и спасибо Бесполезным за указание на это.
ИЗМЕНИТЬ 3:
Оптимизировал функции newSnap()
и flipriter()
в соответствии с предложениями @Display Name, эффективно удалив 2 избыточных load()
за цикл цикла.