Подтвердить что ты не робот

Единый производитель, единая структура данных пользователя с двойным буфером в С++

У меня есть приложение в $work, где мне нужно перемещаться между двумя потоками реального времени, которые запланированы на разных частотах. (Фактическое планирование не под моим контролем.) Приложение жестко работает в режиме реального времени (один из потоков должен управлять аппаратным интерфейсом), поэтому передача данных между потоками должна быть заблокирована и без ожидания насколько возможно.

Важно отметить, что нужно переносить только один блок данных: поскольку два потока работают с разной скоростью, будут выполняться два итерации более быстрого потока между двумя пробуждениями более медленного потока; в этом случае нормально перезаписывать данные в буфере записи, чтобы более медленный поток получал только последние данные.

Другими словами, вместо очереди достаточно двойного буферизованного решения. Два буфера выделяются во время инициализации, а потоки чтения и записи могут вызывать методы класса для получения указателей на один из этих буферов.

Код С++:

#include <mutex>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() {
        m_write_busy = false;
        m_read_idx = m_write_idx = 0;
    }

    ~ProducerConsumerDoubleBuffer() { }

    // The writer thread using this class must call
    // start_writing() at the start of its iteration
    // before doing anything else to get the pointer
    // to the current write buffer.
    T * start_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = true;
        m_write_idx = 1 - m_read_idx;

        return &m_buf[m_write_idx];
    }
    // The writer thread must call end_writing()
    // as the last thing it does
    // to release the write busy flag.
    void end_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = false;
    }

    // The reader thread must call start_reading()
    // at the start of its iteration to get the pointer
    // to the current read buffer.
    // If the write thread is not active at this time,
    // the read buffer pointer will be set to the 
    // (previous) write buffer - so the reader gets the latest data.
    // If the write buffer is busy, the read pointer is not changed.
    // In this case the read buffer may contain stale data,
    // it is up to the user to deal with this case.
    T * start_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        if (!m_write_busy) {
            m_read_idx = m_write_idx;
        }

        return &m_buf[m_read_idx];
    }
    // The reader thread must call end_reading()
    // at the end of its iteration.
    void end_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_read_idx = m_write_idx;
    }

private:
    T m_buf[2];
    bool m_write_busy;
    unsigned int m_read_idx, m_write_idx;
    std::mutex m_mutex;
};

Чтобы избежать устаревших данных в потоке считывателя, структура полезной нагрузки проверяется версией. Чтобы облегчить двунаправленную передачу данных между потоками, используются два экземпляра вышеупомянутого монстра в противоположных направлениях.

Вопросы:

  • Является ли эта схема потокобезопасной? Если он сломан, где?
  • Можно ли это сделать без мьютекса? Возможно, с помощью только барьеров памяти или инструкций CAS?
  • Можно ли сделать лучше?
4b9b3361

Ответ 1

Очень интересная проблема! Дорога сложнее, чем я сначала подумал:-) Мне нравятся свободные от блокировки решения, поэтому я попытался работать один из них ниже.

Есть много способов думать об этой системе. Вы можете моделировать это как циклический буфер/очередь фиксированного размера (с двумя записями), но затем вы теряете возможность обновлять следующее доступное значение для потребления, поскольку вы не знаете, начал ли потребитель читать недавно опубликованные значение или по-прежнему (потенциально) чтение предыдущего. Таким образом, дополнительное состояние необходимо за пределами стандартного кольцевого буфера, чтобы достичь более оптимального Решение.

Прежде всего, обратите внимание, что всегда есть ячейка, которую производитель может безопасно записать в в любой момент времени; если одна ячейка считывается потребителем, другие могут быть записаны. Позвоните в ячейку, которую можно безопасно записать в "активная" ячейка (ячейка, которая может быть потенциально прочитана, - это какая-либо ячейка не активный). Активная ячейка может быть переключена только в том случае, если другая ячейка не в настоящее время считывается с.

В отличие от активной ячейки, которую всегда можно записать, неактивная ячейка может только читать, если он содержит значение; как только это значение будет потреблено, оно исчезло. (Это означает, что в случае агрессивного производителя избегается ливня, а на некоторых точка, потребитель будет опорожнять ячейку и перестанет касаться клеток. однажды что происходит, производитель может определенно опубликовать значение, тогда как до этого момента, он может опубликовать только значение (изменить активную ячейку), если потребитель не находится в середина прочитанного.)

Если есть значение, которое готово к употреблению, только потребитель может изменить это факт (для неактивной ячейки, во всяком случае); последующие производства могут изменить, какая ячейка активна и опубликованная величина, но значение всегда будет готово к чтению до тех пор, пока он потреблял.

Как только производитель будет записан в активную ячейку, он может "опубликовать" это значение на изменение ячейки, являющейся активной (замена индекса), при условии, что потребитель а не в середине чтения другой ячейки. Если потребитель находится в середине читая другую ячейку, своп не может произойти, но в этом случае потребитель может обменять после того, как он будет читать значение, если производитель не находится в середине напишите (и если это так, то производитель поменяет его, как только это будет сделано). Фактически, в целом потребитель всегда может поменяться после его чтения (если это единственный доступ к системе), поскольку ложные свопы потребителя являются доброкачественными: если есть что-то в другой ячейке, тогда обмен будет заставлять это читать далее, и если там нет, замена меняет ничего.

Итак, нам нужна общая переменная для отслеживания активной ячейки. Нам также нужна как для производителя, так и для потребителя, чтобы указать, находятся ли они в середине операция. Мы можем хранить эти три части состояния в одной атомной переменной, чтобы чтобы иметь возможность воздействовать на них всех сразу (атомарно). Нам также нужен способ, чтобы потребитель мог проверить, есть ли что-либо в неактивная ячейка в первую очередь, и для обоих потоков для изменения этого состояния при необходимости. Я попробовал несколько других подходов, но, в конце концов, проще всего было просто включить эту информацию и в другую атомную переменную. Это делает многое проще рассуждать, поскольку все изменения состояния в системе являются атомарными таким образом.

Я придумал безжизненную реализацию (без блокировки, и все операции завершены в ограниченном числе инструкций).

Время кода!

#include <atomic>
#include <cstdint>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() : m_state(0) { }
    ~ProducerConsumerDoubleBuffer() { }

    // Never returns nullptr
    T* start_writing() {
        // Increment active users; once we do this, no one
        // can swap the active cell on us until we're done
        auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
        return &m_buf[state & 1];
    }

    void end_writing() {
        // We want to swap the active cell, but only if we were the last
        // ones concurrently accessing the data (otherwise the consumer
        // will do it for us when *it's* done accessing the data)

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
        state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
        if ((state & 0x6) == 0) {
            // The consumer wasn't in the middle of a read, we should
            // swap (unless the consumer has since started a read or
            // already swapped or read a value and is about to swap).
            // If we swap, we also want to clear the full flag on what
            // will become the active cell, otherwise the consumer could
            // eventually read two values out of order (it reads a new
            // value, then swaps and reads the old value while the
            // producer is idle).
            m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
        }
    }

    // Returns nullptr if there appears to be no more data to read yet
    T* start_reading() {
        m_readState = m_state.load(std::memory_order_relaxed);
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // Nothing to read here!
            return nullptr;
        }

        // At this point, there is guaranteed to be something to
        // read, because the full flag is never turned off by the
        // producer thread once it on; the only thing that could
        // happen is that the active cell changes, but that can
        // only happen after the producer wrote a value into it,
        // in which case there still a value to read, just in a
        // different cell.

        m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;

        // Now that we've incremented the user count, nobody can swap until
        // we decrement it
        return &m_buf[(m_readState & 1) ^ 1];
    }

    void end_reading() {
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // There was nothing to read; shame to repeat this
            // check, but if these functions are inlined it might
            // not matter. Otherwise the API could be changed.
            // Or just don't call this method if start_reading()
            // returns nullptr -- then you could also get rid
            // of m_readState.
            return;
        }

        // Alright, at this point the active cell cannot change on
        // us, but the active cell flag could change and the user
        // count could change. We want to release our user count
        // and remove the flag on the value we read.

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
        state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
        if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
            // Oi, we were the last ones accessing the data when we released our cell.
            // That means we should swap, but only if the producer isn't in the middle
            // of producing something, and hasn't already swapped, and hasn't already
            // set the flag we just reset (which would mean they swapped an even number
            // of times).  Note that we don't bother swapping if there nothing to read
            // in the other cell.
            m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
        }
    }

private:
    T m_buf[2];

    // The bottom (lowest) bit will be the active cell (the one for writing).
    // The active cell can only be switched if there at most one concurrent
    // user. The next two bits of state will be the number of concurrent users.
    // The fourth bit indicates if there a value available for reading
    // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
    std::atomic<std::uint32_t> m_state;

    std::uint32_t m_readState;
};

Обратите внимание, что семантика такова, что потребитель никогда не может прочитать данное значение дважды, и значение, которое оно читает, всегда новее, чем последнее прочитанное значение. Это также справедливо эффективный в использовании памяти (два буфера, например, ваше исходное решение). Я избегал циклов CAS потому что они обычно менее эффективны, чем одна атомная операция в конфликте.

Если вы решите использовать вышеуказанный код, я предлагаю вам сначала написать для него полные (поточные) модульные тесты. И правильные тесты. Я проверил его, но только чуть-чуть. Дайте мне знать, если вы найдете ошибки: -)

Мой unit test:

ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_writing();
        if (item != nullptr) {      // Always true
            *item = i;
        }
        buf.end_writing();
    }
});
std::thread consumer([&]() {
    int prev = -1;
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_reading();
        if (item != nullptr) {
            assert(*item > prev);
            prev = *item;
        }
        buf.end_reading();
    }
});
producer.join();
consumer.join();

Что касается вашей первоначальной реализации, я только смотрел на нее с легкостью (это гораздо интереснее дизайн нового материала, хех), но ответ david.pfx, похоже, затрагивает ту часть вашего вопроса.

Ответ 2

Да, я думаю, что он сломан.

Если читатель запускает/заканчивает/запускает подряд, он обновляет свой индекс чтения до индекса записи и потенциально считывает данные из индекса записи, даже если запись занята.

Проблема заключается в том, что писатель не знает, какой буфер будет использовать читатель, поэтому писатель должен гарантировать, что оба буфера действительны в любое время. Он не может этого сделать, если потребуется какое-то время для записи данных в буфер [если я не понял какую-то логику, которая здесь не показана).

Да, я думаю, что это можно сделать без блокировок, используя CAS или эквивалентную логику. Я не собираюсь пытаться выразить алгоритм в этом пространстве. Я уверен, что он существует, но не тот, который я могу записать правильно в первый раз. И некоторые поиски в Интернете оказались некоторыми правдоподобными кандидатами. Wait-free IPC с использованием CAS представляется довольно интересной темой и предметом некоторых исследований.


После некоторой дополнительной мысли алгоритм выглядит следующим образом. Вам нужно:

  • 3 буфера: один для писателя, один для чтения и один дополнительный. Буферы упорядочены: они образуют кольцо (но см. Примечание).
  • Состояние для каждого буфера: свободное, полное, письмо, чтение.
  • Функция, которая может проверять состояние буфера и условно изменять состояние на другое значение в одной атомной операции. Я буду использовать CSET для этого.

Автор:

Find the first buffer that is FREE or FULL
  Fail: assert (should never fail, reader can only use one buffer)
  CSET buffer to WRITING
Write into the buffer
CSET buffer to FULL

Читатель:

Find first buffer that is FULL
    Fail: wait (writer may be slow)
    CSET buffer to READING
Read and consume buffer
CSET buffer to FREE

Примечание. Этот алгоритм не гарантирует, что буферы обрабатываются строго по порядку прибытия, и никакие простые изменения не заставят его сделать это. Если это важно, алгоритм должен быть расширен с порядковым номером в буфере, установленным автором, чтобы читатель мог выбрать самый последний буфер.

Я оставляю код как деталь реализации.


Функция CSET нетривиальна. Он должен атомарно проверить, что конкретное место общей памяти равно ожидаемому значению, и если это так изменит его на новое значение. Он возвращает true, если он успешно сделал изменение и false в противном случае. Реализация должна избегать условий гонки, если два потока одновременно получат одно и то же место (и, возможно, на разных процессорах).

Стандартная атомная операционная библиотека С++ содержит набор функций atomic_compare_exchange, которые должны служить цели, если они доступны.

Ответ 3

Здесь версия с использованием InterlockedExchangePointer() и SLIST.

Это решение не поддерживает повторное чтение последнего буфера. Но если это необходимо, это можно сделать на стороне читателя с помощью копии и if( NULL == doubleBuffer.beginReader(...) ) { use backup copy ... }.
Это не сделано, потому что его сложно добавить, а потому, что это не очень реалистично. Представьте, что ваше последнее известное значение становится старше и старше - секунды, дни, недели. Маловероятно, что приложение все равно захочет его использовать. Таким образом, функция повторного чтения факторинга в код двойного буфера отнимает гибкость от приложения.

В двойном буфере есть 1 элемент указателя чтения. Когда вызывается beginRead(), это значение возвращается и атомарно заменяется на NULL. Подумайте об этом как "Читатель ПРИНИМАЕТ буфер".
С помощью endRead() читатель возвращает буфер и добавляется в SLIST, содержащий доступные буферы для операций записи.

Изначально оба буфера добавляются в SLIST, указатель чтения имеет значение NULL.

beginWrite() выводит следующий доступный буфер из SLIST. И это значение никогда не может быть NULL, из-за реализации endWrite().

Последнее, что не менее важно, endWrite() атомарно меняет указатель чтения с возвращенным, свежеписанным буфером, и если указатель чтения не был NULL, он помещает его в SLIST.

Таким образом, даже если считывающая сторона никогда не читает, сторона-автор никогда не исчерпывает буферы. Когда читатель читает, он получает последнее известное значение (один раз!).

То, что эта реализация небезопасно, заключается в том, что есть несколько параллельных читателей или писателей. Но это не было целью в первую очередь.

На уродливой стороне Буферы должны быть структурами с некоторым членом SLIST_HEADER сверху.

Вот, код, но имейте в виду, что это не моя вина, если ваш марсоход попадает на Вену!

const size_t MAX_DATA_SIZE = 512;
typedef
//__declspec(align(MEMORY_ALLOCATION_ALIGNMENT))
struct DataItem_tag
{
    SLIST_ENTRY listNode;
    uint8_t data[MAX_DATA_SIZE];
    size_t length;
} DataItem_t;

class CDoubleBuffer
{
    SLIST_HEADER m_writePointers;
    DataItem_t m_buffers[2];
    volatile DataItem_t *m_readPointer;

public:
    CDoubleBuffer()
        : m_writePointers()
        , m_buffers()
        , m_readPointer(NULL)
    {
        InitializeSListHead(&m_writePointers);
        InterlockedPushEntrySList(&m_writePointers, &m_buffers[0].listNode);
        InterlockedPushEntrySList(&m_writePointers, &m_buffers[1].listNode);
    }
    DataItem_t *beginRead()
    {
        DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, NULL));
        return result;
    }
    void endRead(DataItem_t *dataItem)
    {
        if (NULL != dataItem)
        {
            InterlockedPushEntrySList(&m_writePointers, &dataItem->listNode);
        }
    }
    DataItem_t *beginWrite()
    {
        DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedPopEntrySList(&m_writePointers));
        return result;
    }
    void endWrite(DataItem_t *dataItem)
    {
        DataItem_t *oldReadPointer = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, dataItem));
        if (NULL != oldReadPointer)
        {
            InterlockedPushEntrySList(&m_writePointers, &oldReadPointer->listNode);
        }
    }
};

И вот тестовый код для него. (Для обоих приведенных выше и тестового кода вам нужно < windows.h > и < assert.h > .)

CDoubleBuffer doubleBuffer;

DataItem_t *readValue;
DataItem_t *writeValue;

// nothing to read yet. Make sure NULL is returned.
assert(NULL == doubleBuffer.beginRead());
doubleBuffer.endRead(NULL); // we got nothing, we return nothing.

// First write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 0;
doubleBuffer.endWrite(writeValue);

// Second write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 1;
doubleBuffer.endWrite(writeValue);

// Third write without read - works because it reuses the old buffer for the new write.
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 2;
doubleBuffer.endWrite(writeValue);

readValue = doubleBuffer.beginRead();
assert(NULL != readValue); // NULL would obviously be a terrible bug.
assert(2 == readValue->length); // We got the latest and greatest?
doubleBuffer.endRead(readValue);

readValue = doubleBuffer.beginRead();
assert(NULL == readValue); // We expect NULL here. Re-reading is not a feature of this implementation!
doubleBuffer.endRead(readValue);