Подтвердить что ты не робот

Lock Free Queue - Single Producer, несколько потребителей

Я ищу метод для реализации блокированной структуры данных очереди, которая поддерживает один производитель и несколько потребителей. Я рассмотрел классический метод Магеда Майкла и Майкла Скотта (1996), но их версия использует связанные списки. Мне нужна реализация, которая использует ограниченный круговой буфер. Что-то, что использует атомные переменные?

С другой стороны, я не уверен, почему эти классические методы предназначены для связанных списков, для которых требуется много динамического управления памятью. В многопоточной программе все процедуры управления памятью сериализуются. Разве мы не побеждаем преимущества бесконтактных методов, используя их в сочетании с динамическими структурами данных?

Я пытаюсь закодировать это в C/С++, используя библиотеку pthread в 64-битной архитектуре Intel.

Спасибо, Shirish

4b9b3361

Ответ 1

Использование кругового буфера делает блокировку необходимой, так как блокировка необходима, чтобы предотвратить прохождение головы мимо хвоста. Но в противном случае головные и хвостовые указатели могут легко обновляться атомарно. Или в некоторых случаях буфер может быть настолько большим, что перезапись не является проблемой. (в реальной жизни вы увидите это в автоматизированных торговых системах с круговыми буферами, имеющими размер X-минутных рыночных данных. Если вы отстаете на X минут, у вас есть более сложные проблемы, чем перезапись буфера).

Когда я реализовал очередь MS в С++, я построил свободный от блокировки распределитель, используя стек, который очень легко реализовать. Если у меня есть MSQueue, то во время компиляции я знаю sizeof (MSQueue:: node). Затем я делаю стек из N буферов требуемого размера. N может расти, т.е. Если pop() возвращает null, легко перейти к куче для большего количества блоков, и они будут перенесены в стек. Вне возможного блокирующего вызова для большей памяти это операция блокировки.

Обратите внимание, что T не может иметь нетривиальный dtor. Я работал над версией, которая позволяла использовать нетривиальные dtors, которые действительно работали. Но я обнаружил, что проще просто сделать T указателем на T, который я хотел, где производитель выпустил право собственности, и потребитель приобрел право собственности. Это, конечно, требует, чтобы сам Т был выделен с использованием методов lockfree, но тот же самый распределитель, который я сделал с помощью стека, также работает здесь.

В любом случае точка программирования с блокировкой не в том, что сами структуры данных медленнее. Точки таковы:

  • lock free делает меня независимым от планировщика. Программирование на основе блокировки зависит от планировщика, чтобы убедиться, что держатели блокировки работают, чтобы они могли освободить блокировку. Это то, что вызывает "инверсию приоритета". В Linux есть некоторые атрибуты блокировки, чтобы убедиться, что это происходит.
  • Если я не зависим от планировщика, у ОС гораздо меньше времени на управление временными рядами, и я получаю гораздо меньше переключений контекста.
  • легче написать правильные многопоточные программы, используя методы lockfree, поскольку мне не нужно беспокоиться о тупике, livelock, планировании, синхронизации и т.д. Это особенно актуально с реализациями с общей памятью, где процесс может умереть при сохранении блокировки в общих памяти, и нет возможности освободить блокировку.
  • методы блокировки намного проще масштабировать. Фактически, я реализовал методы блокировки с использованием обмена сообщениями по сети. Распределенные блокировки, подобные этому, являются кошмаром.

Тем не менее, есть много случаев, когда блокировочные методы предпочтительнее и/или требуются

  • при обновлении дорогостоящих или невозможных копий. Большинство методов блокировки используют какое-то управление версиями, т.е. Делают копию объекта, обновляют его и проверяют, является ли общая версия тем же, что и при ее копировании, а затем обновляйте текущую версию. Els ecopy снова, примените обновление и снова проверьте. Продолжайте делать это, пока это не сработает. Это хорошо, когда объекты небольшие, но они большие или содержат файлы, и т.д., А затем не рекомендуются.
  • Большинство типов недоступны для доступа без блокировки, например. любой контейнер STL. Они имеют инварианты, которые требуют неатомного доступа, например assert (vector.size() == vector.end() - vector.begin()). Поэтому, если вы обновляете/читаете вектор, который является общим, вам необходимо заблокировать его.

Ответ 2

Это старый вопрос, но никто не принял приемлемого решения. Поэтому я предлагаю эту информацию для других, которые могут искать.

Этот веб-сайт: http://www.1024cores.net

Предоставляет некоторые действительно полезные блокировки с защитой от несанкционированного доступа/бездействующей структуры данных с подробными объяснениями.

То, что вы ищете, - это решение без проблем для чтения/записи.

Смотрите: http://www.1024cores.net/home/lock-free-algorithms/reader-writer-problem

Ответ 3

Для традиционного одноблочного циклического буфера я думаю, что это просто невозможно сделать с помощью атомных операций. Вы должны сделать так много в одном чтении. Предположим, что у вас есть структура, которая имеет это:

uint8_t* buf;
unsigned int size; // Actual max. buffer size
unsigned int length; // Actual stored data length (suppose in write prohibited from being > size)
unsigned int offset; // Start of current stored data

При чтении вам нужно сделать следующее (так оно и было реализовано мной, вы можете поменять некоторые шаги, как я расскажу позже):

  • Убедитесь, что длина чтения не превышает сохраненную длину.
  • Убедитесь, что длина смещения + чтения не превышает границы буфера.
  • Чтение данных
  • Увеличение смещения, уменьшение длины

Что вы должны сделать синхронизированным (таким образом, атомарным), чтобы сделать эту работу? Собственно объедините шаги 1 и 4 на одном атомном шаге или уточните: выполните эту синхронизацию:

  • проверить read_length, это может быть как read_length=min(read_length,length);
  • уменьшить длину с помощью read_length: length-=read_length
  • получить локальную копию со смещением unsigned int local_offset = offset
  • увеличить смещение с помощью read_length: offset+=read_length

После этого вы можете просто создать memcpy (или что-то еще), начиная с вашего local_offset, проверить, читает ли ваш прокручиваемый круговой размер буфера (разделен на 2 memcpy),.... Это "совершенно" потокобезопасно, ваш метод записи все равно может записывать по памяти, которую вы читаете, поэтому убедитесь, что ваш буфер действительно достаточно большой, чтобы минимизировать эту возможность.

Теперь, хотя я могу представить, что вы можете комбинировать 3 и 4 (я думаю, что они делают в случае с связанным списком) или даже 1 и 2 в атомных операциях, я не вижу, чтобы вы делали все это в одной атомной операции:).

Однако вы можете попытаться сбросить "длину", если ваши потребители очень умны и всегда будут знать, что читать. Вам также понадобится новая переменная woffset, потому что старый метод (offset + length)% size для определения смещения записи больше не будет работать. Обратите внимание, что это близко к случаю связанного списка, где вы на самом деле всегда читаете один элемент (= фиксированный, известный размер) из списка. Также здесь, если вы сделаете его круговым связанным списком, вы можете много читать или писать в позицию, которую вы читаете в этот момент!

Наконец: мой совет, просто зайдите с помощью замков, я использую класс CircularBuffer, полностью безопасный для чтения и записи) для видеопотока в реальном времени 720p60, и у меня нет никаких проблем со скоростью от блокировки.