Подтвердить что ты не робот

Круглый блокировочный буфер

Я занимаюсь разработкой системы, которая подключается к одному или нескольким потокам фидов данных и делает некоторый анализ данных, чем триггерные события на основе результата. В типичной многопоточной настройке производителя/потребителя у меня будет несколько потоков производителей, которые помещают данные в очередь, и несколько потребительских потоков, считывающих данные, а потребители интересуются только последней точкой данных плюс n количеством точек. Нити производителя должны будут блокироваться, если медленный потребитель не может идти в ногу, и, конечно, потоки потребителей будут блокироваться, когда нет необработанных обновлений. Использование типичной параллельной очереди с блокировкой чтения/записи будет работать хорошо, но скорость ввода данных может быть огромной, поэтому я хотел уменьшить свои блокирующие накладные расходы, особенно блокировки для производителей. Я думаю, что круговой буфер без блокировки - это то, что мне нужно.

Теперь два вопроса:

  • Является ли круглый буфер без блокировки ответом?

  • Если это так, прежде чем я скачу свой собственный, вы знаете какую-либо публичную реализацию, которая будет соответствовать моей потребности?

Любые указатели на реализацию циклического незакрепленного буфера всегда приветствуются.

BTW, делая это на С++ в Linux.

Дополнительная информация:

Время отклика имеет решающее значение для моей системы. В идеале потребительские потоки захотят увидеть как можно скорее любые обновления, поскольку дополнительная 1 миллисекундная задержка может сделать систему бесполезной или стоит намного меньше.

Идея дизайна, к которой я склоняюсь, представляет собой круглосуточный буфер с замкнутым циклом, в котором потоки производителей помещают данные в буфер так быстро, как можно, позвольте вызывать головку буфера A без блокировки, если только буфер не является полный, когда A встречает конец буфера Z. Потребительские потоки будут содержать два указателя на круговой буфер, P и P n, где P - это локальная головка буфера потока, а P n - n-й элемент после P. Каждый потребительский поток будет продвигать свои P и P n после завершения обработки текущего P, а конец указателя буфера Z продвигается с самым медленным P nсуб > . Когда P улавливает до A, что означает, что больше не требуется новое обновление для обработки, потребитель вращается и занят, ожидая, чтобы A снова продвинулся. Если потребительский поток вращается слишком долго, его можно уложить и ждать переменной условия, но я в порядке с потребителем, занимающим процессорный цикл, ожидающим обновления, потому что это не увеличивает мою задержку (у меня будет больше ядер процессора чем потоки). Представьте, что у вас есть круговой трек, и производитель работает перед кучей потребителей, ключевым моментом является настройка системы, так что производитель обычно работает на несколько шагов впереди потребителей, и большая часть этих операций может быть выполненных с использованием технологий блокировки. Я понимаю, что получить детали прав реализации непросто... хорошо, очень сложно, поэтому я хочу учиться на чужих ошибках, прежде чем сделать несколько своих.

4b9b3361

Ответ 2

За последние пару лет я специально изучил структуры данных без блокировки. Я прочитал большую часть статей в этой области (там всего около четырех или около того), хотя только около десяти или пятнадцати являются реальным использованием: -)

AFAIK, замкнутый круглый буфер не был изобретен. Проблема будет заключаться в сложном условии, когда читатель обгоняет писателя или наоборот.

Если вы не потратили, по крайней мере, шесть месяцев на изучение незакрепленных структур данных, не пытайтесь написать их самостоятельно. Вы ошибетесь, и вам может быть неочевидно, что существуют ошибки, пока ваш код не завершится после развертывания на новых платформах.

Я считаю, что есть решение вашего требования.

Вы должны соединить незаблокированную очередь со свободным списком без блокировки.

Свободный список предоставит вам предварительное выделение и, таким образом, устранит (заведомо дорогое) требование для блокирующего блока; когда свободный список пуст, вы копируете поведение циклического буфера, мгновенно удаляя элемент из очереди и используя это вместо.

(Конечно, в кольцевом буфере на основе блокировки, как только блокировка будет получена, получение элемента очень быстрое - в основном просто разыменование указателя - но вы не получите этого в любом алгоритме без блокировки, они часто должны идти хорошо, чтобы сделать что-то, накладные расходы, связанные с отказом во всплывающей подсказке со списком свободных мест, сопровождаемой декомпрессией, наравне с объемом работы, который должен выполнять любой алгоритм без блокировки).

Майкл и Скотт разработали действительно хорошую бесплатную очередь в 1996 году. Ссылка ниже даст вам достаточно подробностей, чтобы отследить PDF-документ своей статьи; Майкл и Скотт, FIFO

Свободный список без блокировки - это самый простой алгоритм без блокировки, и на самом деле я не думаю, что я видел для него фактическую бумагу.

Ответ 3

Требование, чтобы производители или потребители блокировали, если буфер пуст или заполнен, предполагает, что вы должны использовать нормальную структуру блокировки данных с семафорами или переменными условия, чтобы блоки производителей и потребителей блокировались до тех пор, пока не будут доступны данные. Код без блокировки обычно не блокируется при таких условиях - он вращается или отказывается от операций, которые не могут быть выполнены, а не блокировки с использованием ОС. (Если вы можете позволить себе подождать, пока другой поток не произведет или не будет потреблять данные, то почему ожидания на блокировке для другого потока завершают обновление структуры данных хуже?)

Вкл. (x86/x64) Linux, внутрипоточная синхронизация с использованием мьютексов является разумно дешевой, если нет конкуренции. Сосредоточьтесь на минимизации времени, которое производители и потребители должны удерживать на своих замках. Учитывая, что вы сказали, что заботитесь только о последних N записанных точках данных, я думаю, что круговой буфер будет делать это достаточно хорошо. Тем не менее, я не совсем понимаю, как это соответствует требованиям блокировки и идее потребителей, которые фактически потребляют (удаляют) данные, которые они читают. (Вы хотите, чтобы потребители смотрели только на последние N точек данных, а не удаляли их? Хотите, чтобы продюсеры не заботились о том, не могут ли потребители не отставать и просто перезаписать старые данные?)

Кроме того, как прокомментировал Zan Lynx, вы можете агрегировать/буферизовать свои данные в более крупные куски, когда у вас есть много всего этого. Вы можете заполнить фиксированное количество точек или все данные, полученные в течение определенного количество времени. Это означает, что операций синхронизации будет меньше. Однако он вводит задержку, но если вы не используете Linux в режиме реального времени, вам все равно придется иметь дело с этим.

Ответ 4

Существует довольно хорошая серия статей об этом в DDJ. В качестве признака того, насколько сложно это может быть, это исправление более ранней статьи, в которой было неправильно. Удостоверьтесь, что вы понимаете ошибки, прежде чем откатывать свои собственные) -;

Ответ 5

Реализация в библиотеке ускорения стоит рассмотреть. Он прост в использовании и довольно высокой производительности. Я написал тест и запустил его на четырехъядерном ноутбуке i7 (8 потоков) и получил операцию ~ 4M enqueue/dequeue. Другая реализация, не упомянутая до сих пор, - это очередь MPMC в http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue. Я сделал несколько простых тестов с этой реализацией на одном ноутбуке с 32 производителями и 32 потребителями. Это, как было объявлено, быстрее, чем остановка без блокировки.

Как и большинство других ответов, состояние беззаботного программирования затруднено. В большинстве реализаций будет сложно обнаружить угловые случаи, которые требуют большого количества тестирования и отладки для исправления. Обычно они фиксируются с тщательным размещением барьеров памяти в коде. Вы также найдете доказательства правильности, опубликованные во многих академических статьях. Я предпочитаю тестировать эти реализации с помощью инструмента грубой силы. Любой блокирующий алгоритм, который вы планируете использовать в процессе производства, должен быть проверен на правильность с помощью инструмента, такого как http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html.

Ответ 6

Одним из полезных методов сокращения конкуренции является хэш элементов в несколько очередей и каждый пользователь, посвященный теме.

Для большинства последних элементов, которые интересуют ваши потребители, вы не хотите блокировать всю очередь и перебирать ее, чтобы найти элемент для переопределения - просто публиковать элементы в N-кортежах, т.е. Все N последних элементов. Бонусные баллы для реализации, когда производитель блокирует полную очередь (когда потребители не могут идти в ногу) с тайм-аутом, обновляя свой локальный кеш кортежа - таким образом вы не ставите обратное давление на источник данных.

Ответ 7

Я согласен с этой статьей и рекомендую не использовать блокировки данных. Сравнительно недавно опубликованная статья о беспорядочных очередях fifo this, ищите дальнейшие документы того же автора (ов); существует также кандидатская диссертация о Чалмерсе о незакрепленных структурах данных (я потерял ссылку). Тем не менее, вы не сказали, насколько велики ваши элементы - блокированные данные не работают эффективно только с элементами размера слова, поэтому вам придется динамически выделять свои элементы, если они больше машинного слова (32 или 64 биты). Если вы динамически выделяете элементы, вы переносите (предположительно, поскольку вы не профилировали свою программу и в основном выполняете преждевременную оптимизацию) узкое место в распределителе памяти, поэтому вам нужен блокировщик памяти без блокировки, например Streamflow и интегрировать его с вашим приложением.

Ответ 8

Очередь Sutter неоптимальна, и он это знает. Программное обеспечение Art of Multicore является отличной ссылкой, но не доверяет Java-ребятам по моделям памяти, периоду. Росс не даст вам определенного ответа, потому что у них были библиотеки в таких проблемах и т.д.

Выполнение незакрепленного программирования задает проблемы, если вы не хотите тратить много времени на то, что вы явно переучиваете, прежде чем решить проблему (судя по описанию, это обычное безумие "поиск совершенства" в отношении когерентности кеша). Это занимает годы и приводит к тому, что сначала не решает проблемы, а затем оптимизирует, распространенное заболевание.

Ответ 9

Я не являюсь экспертом моделей аппаратной памяти и не блокирует структуры данных, и я стараюсь избегать использования тех, что есть в моих проектах, и я иду с традиционными заблокированными структурами данных.

Однако я недавно заметил, что видео: Неактивная очередь SPSC на основе кольцевого буфера

Это основано на высокопроизводительной Java-библиотеке с открытым кодом, называемой LMAX distribuptor, используемой торговой системой: LMAX Distruptor

Основываясь на вышеприведенной презентации, вы делаете головные и хвостовые указатели атомными и атомистически проверяете состояние, при котором голова улавливает хвост сзади или наоборот.

Ниже вы можете увидеть для него очень базовую реализацию С++ 11:

// USING SEQUENTIAL MEMORY
#include<thread>
#include<atomic>
#include <cinttypes>
using namespace std;

#define RING_BUFFER_SIZE 1024  // power of 2 for efficient %
class lockless_ring_buffer_spsc
{
    public :

        lockless_ring_buffer_spsc()
        {
            write.store(0);
            read.store(0);
        }

        bool try_push(int64_t val)
        {
            const auto current_tail = write.load();
            const auto next_tail = increment(current_tail);
            if (next_tail != read.load())
            {
                buffer[current_tail] = val;
                write.store(next_tail);
                return true;
            }

            return false;  
        }

        void push(int64_t val)
        {
            while( ! try_push(val) );
            // TODO: exponential backoff / sleep
        }

        bool try_pop(int64_t* pval)
        {
            auto currentHead = read.load();

            if (currentHead == write.load())
            {
                return false;
            }

            *pval = buffer[currentHead];
            read.store(increment(currentHead));

            return true;
        }

        int64_t pop()
        {
            int64_t ret;
            while( ! try_pop(&ret) );
            // TODO: exponential backoff / sleep
            return ret;
        }

    private :
        std::atomic<int64_t> write;
        std::atomic<int64_t> read;
        static const int64_t size = RING_BUFFER_SIZE;
        int64_t buffer[RING_BUFFER_SIZE];

        int64_t increment(int n)
        {
            return (n + 1) % size;
        }
};

int main (int argc, char** argv)
{
    lockless_ring_buffer_spsc queue;

    std::thread write_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.push(i);
             }
         }  // End of lambda expression
                                                );
    std::thread read_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.pop();
             }
         }  // End of lambda expression
                                                );
    write_thread.join();
    read_thread.join();

     return 0;
}

Ответ 10

Это старый поток, но, поскольку он еще не упоминался, есть свободный, круглый, 1 производитель → 1 потребитель, FIFO, доступный в структуре JUCE С++.

https://www.juce.com/doc/classAbstractFifo#details

Ответ 11

Вот как я это сделаю:

  • отобразить очередь в массив
  • сохранить состояние со следующей следующей и следующей следующей индексами записи
  • сохранить пустой вектор с полным битом вокруг

Вставка состоит из использования CAS с приращением и пересчета на следующую запись. После того, как у вас есть слот, добавьте свое значение, а затем установите пустой/полный бит, который соответствует ему.

Для удаления требуется проверка бит перед тем, как протестировать при потоках, но кроме этого, такие же, как для записи, но с использованием индекса чтения и очистки пустого/полного бита.

Будьте предупреждены,

  • Я не знаю этих вещей
  • атомные ASM-операции кажутся очень медленными, когда я их использовал, поэтому, если вы закончите с несколькими из них, вы можете быстрее использовать блокировки, встроенные в функции insert/remove. Теория состоит в том, что один атомный op для захвата блокировки, за которым следуют (очень) несколько неатомных ASM ops, может быть быстрее, чем то же самое, что и несколько атомных op. Но для того, чтобы эта работа требовала ручного или автоматического встраивания, так что все это один короткий блок ASM.

Ответ 12

Просто для полноты: хорошо протестированный кольцевой буфер без блокировки в OtlContainers, но он написан в Delphi (TOmniBaseBoundedQueue является круглым buffer и TOmniBaseBoundedStack - ограниченный стек). Также существует неограниченная очередь в одном блоке (TOmniBaseQueue). Неограниченная очередь описана в Динамическая блокировка - сделайте это правильно. Начальная реализация ограниченной очереди (круговой буфер) была описана в незаблокированная очередь, наконец!, но код был обновлен с тех пор.

Ответ 13

Отъезд Disruptor (Как его использовать), который является кольцевым буфером, на который могут подписаться несколько потоков:

Ответ 14

Хотя это старый вопрос, никто не упоминал DPDK заблокированный буфер буфера. Это высокопроизводительный кольцевой буфер, поддерживающий несколько производителей и несколько потребителей. Он также обеспечивает режимы одного потребителя и одного производителя, а кольцевой буфер не работает в режиме SPSC. Он написан на C и поддерживает несколько архитектур.

Кроме того, он поддерживает режимы Bulk и Burst, в которых элементы могут быть выставлены в очередь или выгружены навалом. Дизайн позволяет нескольким потребителям или нескольким производителям одновременно записывать в очередь простым резервированием пространства путем перемещения атомарного указателя.