Подтвердить что ты не робот

Реализация concurrent_vector по версии intel blog

Я пытаюсь реализовать поточно-безопасный контейнер без блокировки, аналогичный std::vector, в соответствии с этим https://software.intel.com/en-us/blogs/2008/07/24/tbbconcurrent_vector-secrets-of-memory-organization

Из того, что я понял, чтобы предотвратить перераспределение и недействительность всех итераторов на всех потоках, вместо одного непрерывного массива, они добавляют новые смежные блоки.
Каждый добавляемый блок имеет размер возрастающих степеней 2, поэтому они могут использовать log (index), чтобы найти правильный сегмент, где должен находиться элемент в [index].

Из того, что я собираю, у них есть статический массив указателей на сегменты, поэтому они могут быстро получить к ним доступ, однако они не знают, сколько сегментов пользователь хочет, поэтому они сделали небольшую начальную, и если количество сегменты превышают текущий счет, они выделяют огромный и переключаются на использование этого.

Проблема заключается в том, что добавление нового сегмента невозможно в безопасном потоке без ограничений или, по крайней мере, я не понял, как это сделать. Я могу атомарно увеличивать текущий размер, но только это.
А также переход от малого к большому массиву указателей сегмента включает в себя большое выделение и копии памяти, поэтому я не могу понять, как они это делают.

У них есть код, размещенный в Интернете, но все важные функции не имеют исходного кода, они находятся в их DLL Building Building Blocks. Вот несколько примеров, которые демонстрируют проблему:

template<typename T>
class concurrent_vector
{
    private:
        int size = 0;
        int lastSegmentIndex = 0;

        union
        {
            T* segmentsSmall[3];
            T** segmentsLarge;
        };

        void switch_to_large()
        {
            //Bunch of allocations, creates a T* segmentsLarge[32] basically and reassigns all old entries into it
        }

    public:
        concurrent_vector()
        {
            //The initial array is contiguous just for the sake of cache optimization
            T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3
            segmentsSmall[0] = initialContiguousBlock;
            segmentsSmall[1] = initialContiguousBlock + 2;
            segmentsSmall[2] = initialContiguousBlock + 2 + 4;
        }

        void push_back(T& item)
        {
            if(size > 2 + 4 + 8)
            {
                switch_to_large(); //This is the problem part, there is no possible way to make this thread-safe without a mutex lock. I don't understand how Intel does it. It includes a bunch of allocations and memory copies.
            }

            InterlockedIncrement(&size); //Ok, so size is atomically increased

            //afterwards adds the item to the appropriate slot in the appropriate segment
        }
};
4b9b3361

Ответ 1

Я бы не попытался сделать объединение segmentsLarge и segmentsSmall. Да, это отвлекает еще один указатель. Тогда указатель, позволяет называть его просто segments, может первоначально указывать на сегментыSmall.

С другой стороны, другие методы всегда могут использовать один и тот же указатель, который упрощает их.

И переключение с малого на большое может быть достигнуто путем сравнения обменного указателя.

Я не уверен, как это можно сделать безопасно с помощью объединения.

Идея будет выглядеть примерно так (обратите внимание, что я использовал С++ 11, который предшествовал библиотеке Intel, поэтому они, вероятно, сделали это с их атомными внутренними функциями). Это, вероятно, пропустит немало деталей, которые, я уверен, люди Intel думали больше, поэтому вам, скорее всего, придется это проверить против реализаций всех других методов.

#include <atomic>
#include <array>
#include <cstddef>
#include <climits>

template<typename T>
class concurrent_vector
{
private:
  std::atomic<size_t> size;
  std::atomic<T**> segments;
  std::array<T*, 3> segmentsSmall;
  unsigned lastSegmentIndex = 0;

  void switch_to_large()
  {
    T** segmentsOld = segments;
    if( segmentsOld == segmentsSmall.data()) {
      // not yet switched
      T** segmentsLarge = new T*[sizeof(size_t) * CHAR_BIT];
      // note that we leave the original segment allocations alone and just copy the pointers
      std::copy(segmentsSmall.begin(), segmentsSmall.end(), segmentsLarge);
      for(unsigned i = segmentsSmall.size(); i < numSegments; ++i) {
        segmentsLarge = nullptr;
      }
      // now both the old and the new segments array are valid
      if( segments.compare_exchange_strong(segmentsOld, segmentsLarge)) {
        // success!
        return;
      }  else {
        // already switched, just clean up
        delete[] segmentsLarge;
      }
    }
  }

public:
  concurrent_vector()  : size(0), segments(segmentsSmall.data())
  {
    //The initial array is contiguous just for the sake of cache optimization
    T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3
    segmentsSmall[0] = initialContiguousBlock;
    segmentsSmall[1] = initialContiguousBlock + 2;
    segmentsSmall[2] = initialContiguousBlock + 2 + 4;
  }

  void push_back(T& item)
  {
    if(size > 2 + 4 + 8) {
      switch_to_large();
    }
    // here we may have to allocate more segments atomically
    ++size;

    //afterwards adds the item to the appropriate slot in the appropriate segment
  }
};