Подтвердить что ты не робот

Существует ли для С++ многопользовательская одиночная потребительская очередь без ограничений?

Чем больше я читаю, тем более запутанным становится... Я бы подумал, что тривиально найти формально правильную очередь mpsc, реализованную в С++.

Каждый раз, когда я нахожу еще один удар, дальнейшие исследования, похоже, предполагают наличие таких проблем, как ABA или другие тонкие условия гонки.

Многие говорят о необходимости сбора мусора. Этого я хочу избежать.

Есть ли приемлемая корректная реализация с открытым исходным кодом?

4b9b3361

Ответ 1

Возможно, вы захотите проверить неисправность; он доступен на С++ здесь: http://lmax-exchange.github.io/disruptor/

Вы также можете найти объяснение, как это работает fooobar.com/questions/33207/.... В основном это круговой буфер без блокировки, оптимизированный для передачи сообщений FIFO между потоками в слотах фиксированного размера.

Вот две реализации, которые мне сочли полезными: Бесключевая многопрофильная многопользовательская очередь на кольцевом буфере @NatSys Lab. Блог и
Еще одна реализация замкнутой кольцевой решетки  @CodeProject

ПРИМЕЧАНИЕ: приведенный ниже код неверен, я оставляю его только в качестве примера, насколько сложными могут быть эти вещи.

Если вам не нравится сложность версии google, вот что-то похожее от меня - это намного проще, но я оставляю это как упражнение для читателя, чтобы заставить его работать (он является частью более крупного проекта, а не переносится на момент). Вся идея состоит в том, чтобы поддерживать cirtular buffer для данных и небольшой набор счетчиков для идентификации слотов для записи/записи и чтения/чтения. Поскольку каждый счетчик находится в собственной строке кэша, и (как правило) каждый из них обновляется только один раз в реальном времени сообщения, все они могут быть прочитаны без какой-либо синхронизации. Существует одна потенциальная конфликтная точка между написанием потоков в post_done, которая требуется для гарантии FIFO. Для обеспечения правильности и FIFO были выбраны счетчики (head_, wrtn_, rdng_, tail_), поэтому для сброса FIFO также потребуется смена счетчиков (и это может быть трудно обойтись без сатирической корректности). Можно немного улучшить производительность для сценариев с одним потребителем, но я бы не стал беспокоиться - вам придется отменить его, если будут найдены другие варианты использования с несколькими читателями.

На моей машине латентность выглядит следующим образом (процентиль слева, средний в пределах этого процентиля справа, единица - микросекунда, измеренная rdtsc):

    total=1000000 samples, avg=0.24us
    50%=0.214us, avg=0.093us
    90%=0.23us, avg=0.151us
    99%=0.322us, avg=0.159us
    99.9%=15.566us, avg=0.173us

Эти результаты относятся к потребителю с одним опросом, то есть к рабочему потоку, вызывающему wheel.read() в узком цикле и проверке, если он не пуст (например, прокрутите вниз). Ожидание потребителей (значительно меньшее использование ЦП) ожидало бы событие (одна из функций acquire...), это добавляет примерно 1-2% к средней задержке из-за переключения контекста.

Поскольку на чтении имеется довольно мало раздумий, потребители очень хорошо оценивают количество рабочих потоков, например. для 3 потоков на моей машине:

    total=1500000 samples, avg=0.07us
    50%=0us, avg=0us
    90%=0.155us, avg=0.016us
    99%=0.361us, avg=0.038us
    99.9%=8.723us, avg=0.044us

Патчи приветствуются:)

// Copyright (c) 2011-2012, Bronislaw (Bronek) Kozicki
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <core/api.hxx>
#include <core/wheel/exception.hxx>

#include <boost/noncopyable.hpp>
#include <boost/type_traits.hpp>
#include <boost/lexical_cast.hpp>
#include <typeinfo>

namespace core { namespace wheel
{
  struct bad_size : core::exception
  {
    template<typename T> explicit bad_size(const T&, size_t m)
      : core::exception(std::string("Slot capacity exceeded, sizeof(")
                  + typeid(T).name()
                  + ") = "
                  + boost::lexical_cast<std::string>(sizeof(T))
                  + ", capacity = "
                  + boost::lexical_cast<std::string>(m)
                  )
    {}
  };        

  // inspired by Disruptor
  template <typename Header>
  class wheel : boost::noncopyable
  {
    __declspec(align(64))
    struct slot_detail
    {
      // slot write: (memory barrier in wheel) > post_done > (memory barrier in wheel)
      // slot read:  (memory barrier in wheel) > read_done > (memory barrier in wheel)

      // done writing or reading, must update wrtn_ or tail_ in wheel, as appropriate
      template <bool Writing>
      void done(wheel* w)
      {
        if (Writing)
          w->post_done(sequence);
        else
          w->read_done();
      }

      // cache line for sequence number and header
      long long sequence;
      Header header;

      // there is no such thing as data type with variable size, but we need it to avoid thrashing
      // cache - so we invent one. The memory is reserved in runtime and we simply go beyond last element.
      // This is well into UB territory! Using template parameter for this is not good, since it
      // results in this small implementation detail leaking to all possible user interfaces.
      __declspec(align(8))
      char data[8];
    };

    // use this as a storage space for slot_detail, to guarantee 64 byte alignment
    _declspec(align(64))
    struct slot_block { long long padding[8]; };

  public:
    // wrap slot data to outside world
    template <bool Writable>
    class slot
    {
      template<typename> friend class wheel;

      slot& operator=(const slot&); // moveable but non-assignable

      // may only be constructed by wheel
      slot(slot_detail* impl, wheel<Header>* w, size_t c)
        : slot_(impl) , wheel_(w) , capacity_(c)
      {}

    public:
      slot(slot&& s)
        : slot_(s.slot_) , wheel_(s.wheel_) , capacity_(s.capacity_)
      {
        s.slot_ = NULL;
      }

      ~slot()
      {
        if (slot_)
        {
          slot_->done<Writable>(wheel_);
        }
      }

      // slot accessors - use Header to store information on what type is actually stored in data
      bool empty() const          { return !slot_; }
      long long sequence() const  { return slot_->sequence; }
      Header& header()            { return slot_->header; }
      char* data()                { return slot_->data; }

      template <typename T> T& cast()
      {
        static_assert(boost::is_pod<T>::value, "Data type must be POD");
        if (sizeof(T) > capacity_)
          throw bad_size(T(), capacity_);
        if (empty())
          throw no_data();
        return *((T*) data());
      }

    private:
      slot_detail*    slot_;
      wheel<Header>*  wheel_;
      const size_t    capacity_;
    };

  private:
    // dynamic size of slot, with extra capacity, expressed in 64 byte blocks
    static size_t sizeof_slot(size_t s)
    {
      size_t m = sizeof(slot_detail);
      // add capacity less 8 bytes already within sizeof(slot_detail)
      m += max(8, s) - 8;
      // round up to 64 bytes, i.e. alignment of slot_detail
      size_t r = m & ~(unsigned int)63;
      if (r < m)
        r += 64;
      r /= 64;
      return r;
    }

    // calculate actual slot capacity back from number of 64 byte blocks
    static size_t slot_capacity(size_t s)
    {
      return s*64 - sizeof(slot_detail) + 8;
    }

    // round up to power of 2
    static size_t round_size(size_t s)
    {
      // enfore minimum size
      if (s <= min_size)
        return min_size;

      // find rounded value
      --s;
      size_t r = 1;
      while (s)
      {
        s >>= 1;
        r <<= 1;
      };
      return r;
    }

    slot_detail& at(long long sequence)
    {
      // find index from sequence number and return slot at found index of the wheel
      return *((slot_detail*) &wheel_[(sequence & (size_ - 1)) * blocks_]);
    }

  public:
    wheel(size_t capacity, size_t size)
      : head_(0) , wrtn_(0) , rdng_(0) , tail_(0) , event_()
      , blocks_(sizeof_slot(capacity)) , capacity_(slot_capacity(blocks_)) , size_(round_size(size))
    {
      static_assert(boost::is_pod<Header>::value, "Header type must be POD");
      static_assert(sizeof(slot_block) == 64, "This was unexpected");

      wheel_ = new slot_block[size_ * blocks_];
      // all slots must be initialised to 0
      memset(wheel_, 0, size_ * 64 * blocks_);
      active_ = 1;
    }

    ~wheel()
    {
      stop();
      delete[] wheel_;
    }

    // all accessors needed
    size_t capacity() const { return capacity_; }   // capacity of a single slot
    size_t size() const     { return size_; }       // number of slots available
    size_t queue() const    { return (size_t)head_ - (size_t)tail_; }
    bool active() const     { return active_ == 1; }

    // enough to call it just once, to fine tune slot capacity
    template <typename T>
    void check() const
    {
      static_assert(boost::is_pod<T>::value, "Data type must be POD");
      if (sizeof(T) > capacity_)
        throw bad_size(T(), capacity_);
    }

    // stop the wheel - safe to execute many times
    size_t stop()
    {
      InterlockedExchange(&active_, 0);
      // must wait for current read to complete
      while (rdng_ != tail_)
        Sleep(10);

      return size_t(head_ - tail_);
    }

    // return first available slot for write
    slot<true> post()
    {
      if (!active_)
        throw stopped();

      // the only memory barrier on head seq. number we need, if not overflowing
      long long h = InterlockedIncrement64(&head_);
      while(h - (long long) size_ > tail_)
      {
        if (InterlockedDecrement64(&head_) == h - 1)
          throw overflowing();

        // protection against case of race condition when we are overflowing
        // and two or more threads try to post and two or more messages are read,
        // all at the same time. If this happens we must re-try, otherwise we
        // could have skipped a sequence number - causing infinite wait in post_done
        Sleep(0);
        h = InterlockedIncrement64(&head_);
      }

      slot_detail& r = at(h);
      r.sequence = h;

      // wrap in writeable slot
      return slot<true>(&r, this, capacity_);
    }

    // return first available slot for write, nothrow variant
    slot<true> post(std::nothrow_t)
    {
      if (!active_)
        return slot<true>(NULL, this, capacity_);

      // the only memory barrier on head seq. number we need, if not overflowing
      long long h = InterlockedIncrement64(&head_);
      while(h - (long long) size_ > tail_)
      {
        if (InterlockedDecrement64(&head_) == h - 1)
          return slot<true>(NULL, this, capacity_);

        // must retry if race condition described above
        Sleep(0);
        h = InterlockedIncrement64(&head_);
      }

      slot_detail& r = at(h);
      r.sequence = h;

      // wrap in writeable slot
      return slot<true>(&r, this, capacity_);
    }

    // read first available slot for read
    slot<false> read()
    {
      slot_detail* r = NULL;
      // compare rdng_ and wrtn_ early to avoid unnecessary memory barrier
      if (active_ && rdng_ < wrtn_)
      {
        // the only memory barrier on reading seq. number we need
        const long long h = InterlockedIncrement64(&rdng_);
        // check if this slot has been written, step back if not
        if (h > wrtn_)
          InterlockedDecrement64(&rdng_);
        else
          r = &at(h);
      }

      // wrap in readable slot
      return slot<false>(r , this, capacity_);
    }

    // waiting for new post, to be used by non-polling clients
    void acquire()
    {
      event_.acquire();
    }

    bool try_acquire()
    {
      return event_.try_acquire();
    }

    bool try_acquire(unsigned long timeout)
    {
      return event_.try_acquire(timeout);
    }

    void release()
    {}

  private:
    void post_done(long long sequence)
    {
      const long long t = sequence - 1;

      // the only memory barrier on written seq. number we need
      while(InterlockedCompareExchange64(&wrtn_, sequence, t) != t)
        Sleep(0);

      // this is outside of critical path for polling clients
      event_.set();
    }

    void read_done()
    {
      // the only memory barrier on tail seq. number we need
      InterlockedIncrement64(&tail_);
    }

    // each in its own cache line
    // head_ - wrtn_ = no. of messages being written at this moment
    // rdng_ - tail_ = no. of messages being read at the moment
    // head_ - tail_ = no. of messages to read (including those being written and read)
    // wrtn_ - rdng_ = no. of messages to read (excluding those being written or read)
    __declspec(align(64)) volatile long long head_; // currently writing or written
    __declspec(align(64)) volatile long long wrtn_; // written
    __declspec(align(64)) volatile long long rdng_; // currently reading or read
    __declspec(align(64)) volatile long long tail_; // read
    __declspec(align(64)) volatile long active_;    // flag switched to 0 when stopped

    __declspec(align(64))
    api::event event_;          // set when new message is posted
    const size_t blocks_;       // number of 64-byte blocks in a single slot_detail
    const size_t capacity_;     // capacity of data() section per single slot. Initialisation depends on blocks_
    const size_t size_;         // number of slots available, always power of 2
    slot_block* wheel_;
  };
}}

Вот что может выглядеть опрос пользователя потребительского потока:

  while (wheel.active())
  {
    core::wheel::wheel<int>::slot<false> slot = wheel.read();
    if (!slot.empty())
    {
      Data& d = slot.cast<Data>();
      // do work
    }
    // uncomment below for waiting consumer, saving CPU cycles
    // else
    //   wheel.try_acquire(10);
  }

Отредактированный добавленный пример потребителя

Ответ 2

Наиболее подходящая реализация зависит от желаемых свойств очереди. Должен ли он быть неограниченным или ограниченным, хорошо? Должно ли быть линеаризуемым, или менее строгие требования были бы точными? Насколько сильный FIFO вам нужен? Готовы ли вы оплатить стоимость возврата списка потребителем (существует очень простая реализация, когда потребитель захватывает хвост односвязного списка, таким образом, сразу же все предметы, поставленные производителями)? Должен ли он гарантировать, что нить никогда не заблокирована, или крошечные шансы на то, что какая-то нить заблокирована, в порядке? И т.д.

Некоторые полезные ссылки:
Возможно ли использование нескольких производителей с одним потребителем в режиме блокировки? http://www.1024cores.net/home/lock-free-algorithms/queues
http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
https://groups.google.com/group/comp.programming.threads/browse_frm/thread/33f79c75146582f3

Надеюсь, что это поможет.

Ответ 3

Я предполагаю, что такая вещь не существует - и если это так, она либо не переносима, либо не является открытым исходным кодом.

Концептуально вы пытаетесь одновременно управлять двумя указателями: указателем tail и указателем tail->next. Обычно это невозможно сделать с помощью просто незакрепленных примитивов.

Ответ 4

Ниже приведен метод, который я использовал для моей совместной многозадачной/многопоточной библиотеки (MACE) http://bytemaster.github.com/mace/. Он имеет преимущество блокировки, за исключением случаев, когда очередь пуста.

struct task {
   boost::function<void()> func;
   task* next;
};


boost::mutex                     task_ready_mutex;
boost::condition_variable        task_ready;
boost::atomic<task*>             task_in_queue;

// this can be called from any thread
void thread::post_task( task* t ) {
     // atomically post the task to the queue.
     task* stale_head = task_in_queue.load(boost::memory_order_relaxed);
     do { t->next = stale_head;
     } while( !task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );

   // Because only one thread can post the 'first task', only that thread will attempt
   // to aquire the lock and therefore there should be no contention on this lock except
   // when *this thread is about to block on a wait condition.  
    if( !stale_head ) { 
        boost::unique_lock<boost::mutex> lock(task_ready_mutex);
        task_ready.notify_one();
    }
}

// this is the consumer thread.
void process_tasks() {
  while( !done ) {
   // this will atomically pop everything that has been posted so far.
   pending = task_in_queue.exchange(0,boost::memory_order_consume);
   // pending is a linked list in 'reverse post order', so process them
   // from tail to head if you want to maintain order.

   if( !pending ) { // lock scope
      boost::unique_lock<boost::mutex> lock(task_ready_mutex);                
      // check one last time while holding the lock before blocking.
      if( !task_in_queue ) task_ready.wait( lock );
   }
 }