Подтвердить что ты не робот

С++ 11 упорядочение атомной памяти - это правильное использование упорядоченного (освобождения-потребления) заказа?

Недавно я сделал порт для С++ 11, используя std:: atomic тройного буфера, который будет использоваться в качестве механизма синхронизации concurrency. Идея этого подхода синхронизации потоков заключается в том, что для ситуации с производителем-потребителем, когда у вас есть производитель, который работает быстрее, чем потребитель, тройная буферизация может дать некоторые преимущества, поскольку поток производителя не будет "замедлен", если придется ждать для потребителя. В моем случае у меня есть физический поток, который обновляется со скоростью ~ 120 кадров в секунду и поток рендеринга, который работает на скорости ~ 60 кадров в секунду. Очевидно, что я хочу, чтобы поток рендеринга всегда получал самое последнее состояние, но я также знаю, что я буду пропускать много кадров из физического потока из-за разницы в ставках. С другой стороны, я хочу, чтобы мой физический поток поддерживал постоянную скорость обновления и не ограничивался медленным потоком рендеринга, блокирующим мои данные.

Оригинальный код C был сделан remis-мыслями, и полное объяснение находится в его blog. Я призываю всех, кто интересуется его чтением, для дальнейшего понимания первоначальной реализации.

Моя реализация может быть найдена здесь.

Основная идея состоит в том, чтобы иметь массив с тремя позициями (буферами) и атомным флагом, который сравнивается и заменяется, чтобы определить, какие элементы массива соответствуют какому состоянию в любой момент времени. Таким образом, только одна атомная переменная используется для моделирования всех 3 индексов массива и логики тройной буферизации. Буферные 3 позиции называются Dirty, Clean и Snap. Производитель всегда пишет индекс Dirty и может перевернуть запись, чтобы поменять местами Dirty с текущим индексом Clean. Потребитель может запросить новую привязку, которая свопирует текущий индекс Snap с индексом Clean, чтобы получить самый последний буфер. Пользователь всегда считывает буфер в позиции привязки.

Флаг состоит из 8-битного без знака int, а биты соответствуют:

(не используется) (новая запись) (2x грязная) (2x чистая) (2x привязка)

Дополнительный бит бит newWrite устанавливается автором и очищается читателем. Читатель может использовать это, чтобы проверить, были ли какие-либо записи с момента последней привязки, и если нет, это не займет еще одну привязку. Флаг и индексы могут быть получены с помощью простых побитовых операций.

Теперь для кода:

template <typename T>
class TripleBuffer
{

public:

  TripleBuffer<T>();
  TripleBuffer<T>(const T& init);

  // non-copyable behavior
  TripleBuffer<T>(const TripleBuffer<T>&) = delete;
  TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;

  T snap() const; // get the current snap to read
  void write(const T newT); // write a new value
  bool newSnap(); // swap to the latest value, if any
  void flipWriter(); // flip writer positions dirty / clean

  T readLast(); // wrapper to read the last available element (newSnap + snap)
  void update(T newT); // wrapper to update with a new element (write + flipWriter)

private:

  bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
  uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
  uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes

  // 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
  // newWrite   = (flags & 0x40)
  // dirtyIndex = (flags & 0x30) >> 4
  // cleanIndex = (flags & 0xC) >> 2
  // snapIndex  = (flags & 0x3)
  mutable atomic_uint_fast8_t flags;

  T buffer[3];
};

реализация:

template <typename T>
TripleBuffer<T>::TripleBuffer(){

  T dummy = T();

  buffer[0] = dummy;
  buffer[1] = dummy;
  buffer[2] = dummy;

  flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}

template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){

  buffer[0] = init;
  buffer[1] = init;
  buffer[2] = init;

  flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}

template <typename T>
T TripleBuffer<T>::snap() const{

  return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}

template <typename T>
void TripleBuffer<T>::write(const T newT){

  buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}

template <typename T>
bool TripleBuffer<T>::newSnap(){

  uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
  do {
    if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
      return false;
  } while(!flags.compare_exchange_weak(flagsNow,
                                       swapSnapWithClean(flagsNow),
                                       memory_order_release,
                                       memory_order_consume));
  return true;
}

template <typename T>
void TripleBuffer<T>::flipWriter(){

  uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
  while(!flags.compare_exchange_weak(flagsNow,
                                     newWriteSwapCleanWithDirty(flagsNow),
                                     memory_order_release,
                                     memory_order_consume));
}

template <typename T>
T TripleBuffer<T>::readLast(){
    newSnap(); // get most recent value
    return snap(); // return it
}

template <typename T>
void TripleBuffer<T>::update(T newT){
    write(newT); // write new value
    flipWriter(); // change dirty/clean buffer positions for the next update
}

template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
    // check if the newWrite bit is 1
    return ((flags & 0x40) != 0);
}

template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
    // swap snap with clean
    return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}

template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
    // set newWrite bit to 1 and swap clean with dirty 
    return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}

Как вы можете видеть, я решил использовать шаблон Release-Consume для упорядочения памяти. Релиз (memory_order_release) для хранилища гарантирует, что никакие записи в текущем потоке не могут быть переупорядочены после магазина. С другой стороны, Consume гарантирует, что никакие чтения в текущем потоке, зависящие от загружаемого значения, могут быть переупорядочены до этой нагрузки. Это гарантирует, что записи в зависимые переменные в других потоках, выпускающих одну и ту же атомную переменную, будут видны в текущем потоке.

Если мое понимание правильное, так как мне нужны только те атомы, которые нужно установить атомом, операции с другими переменными, которые не влияют непосредственно на флаги, могут свободно переупорядочиваться компилятором, что позволяет увеличить оптимизацию. Из чтения некоторых документов по новой модели памяти я также знаю, что эти расслабленные атомы будут иметь заметное влияние только на такие платформы, как ARM и POWER (они были введены главным образом из-за них). Поскольку я нацелен на ARM, я считаю, что я мог бы извлечь выгоду из этих операций и уметь выжать немного больше производительности.

Теперь на вопрос:

Правильно ли я использую расслабленное упорядочение Release-Consume для этой конкретной проблемы?

Спасибо,

Андре

PS: Извините за длинный пост, но я полагал, что для лучшего обзора проблемы нужен какой-то достойный контекст.

РЕДАКТИРОВАТЬ: Реализованные предложения @Yakk:

  • Исправлено flags чтение на newSnap() и flipWriter(), которые использовали прямое назначение, поэтому использовали по умолчанию load(std::memory_order_seq_cst).
  • Для ясности перемещены операции с чередованием бит на выделенные функции.
  • Добавлен возвращаемый тип bool в newSnap(), теперь возвращает false, если в противном случае ничего нового и не было.
  • Определенный класс как не скопируемый с помощью = delete idiom, поскольку оба конструктора копирования и присваивания были небезопасными, если использовался TripleBuffer.

ИЗМЕНИТЬ 2:Исправлено описание, которое было неверным (спасибо @Useless). Покупатель запрашивает новую привязку и считывает из индекса Snap (а не "писатель" ). Извините за отвлечение и спасибо Бесполезным за указание на это.

ИЗМЕНИТЬ 3: Оптимизировал функции newSnap() и flipriter() в соответствии с предложениями @Display Name, эффективно удалив 2 избыточных load() за цикл цикла.

4b9b3361

Ответ 1

Почему вы загружаете значение старых флагов дважды в циклы CAS? Первый раз - flags.load(), а второй - compare_exchange_weak(), который стандарт указывает на отказ CAS, загружает предыдущее значение в первый аргумент, который в этом случае является flagsNow.

Согласно http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange, "В противном случае загружает фактическое значение, хранящееся в * это в ожидаемое (выполняет операцию загрузки)." То, что делает ваш цикл, это то, что при ошибке compare_exchange_weak() перезагружает flagsNow, тогда цикл повторяется, и первый оператор загружает его еще раз, сразу после загрузки compare_exchange_weak(). Мне кажется, что ваша петля должна вместо этого вытащить нагрузку за пределы петли. Например, newSnap() будет:

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do
{
    if( !isNewWrite(flagsNow)) return false; // nothing new, no need to swap
} while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume));

и flipWriter():

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume));

Ответ 2

Да, это разница между memory_order_acquire и memory_order_consume, но вы не заметите этого, когда используете его 180 раз в секунду. Вы можете запустить мой тест с m2 = memory_order_consume, если вы хотите узнать ответ в цифрах. Просто измените имя производителя_or_consumer_Thread на что-то вроде этого:

TripleBuffer <int> tb;

void producer_or_consumer_Thread(void *arg)
{
    struct Arg * a = (struct Arg *) arg;
    bool succeeded = false;
    int i = 0, k, kold = -1, kcur;

    while (a->run)
    {
        while (a->wait) a->is_waiting = true; // busy wait
        if (a->producer)
        {
            i++;
            tb.update(i);
            a->counter[0]++;
        }
        else
        {
            kcur = tb.snap();
            if (kold != -1 && kcur != kold) a->counter[1]++;
            succeeded = tb0.newSnap();
            if (succeeded)
            {
                k = tb.readLast();
                if (kold == -1)
                    kold = k;
                else if (kold = k + 1)
                    kold = k;
                else
                    succeeded = false;
            }
            if (succeeded) a->counter[0]++;   
        }
    }
    a->is_waiting =  true;
}

ИСПЫТАНИЕ Результат:

_#_  __Produced __Consumed _____Total
  1    39258150   19509292   58767442
  2    24598892   14730385   39329277
  3    10615129   10016276   20631405
  4    10617349   10026637   20643986
  5    10600334    9976625   20576959
  6    10624009   10069984   20693993
  7    10609040   10016174   20625214
  8    25864915   15136263   41001178
  9    39847163   19809974   59657137
 10    29981232   16139823   46121055
 11    10555174    9870567   20425741
 12    25975381   15171559   41146940
 13    24311523   14490089   38801612
 14    10512252    9686540   20198792
 15    10520211    9693305   20213516
 16    10523458    9720930   20244388
 17    10576840    9917756   20494596
 18    11048180    9528808   20576988
 19    11500654    9530853   21031507
 20    11264789    9746040   21010829