Подтвердить что ты не робот

Самая быстрая встроенная спин-блокировка

Я пишу многопоточное приложение на С++, где производительность критическая. Мне нужно использовать много блокировки при копировании небольших структур между потоками, для этого я выбрал использование штифтов.

Я провел некоторое исследование и тестирование скорости на этом, и я обнаружил, что большинство реализаций примерно одинаково быстрые:

  • Microsoft CRITICAL_SECTION, с установленным значением SpinCount до 1000, насчитывает около 140 единиц времени.
  • Реализация этот алгоритм с Microsoft InterlockedCompareExchange насчитывает около 95 единиц времени
  • Ive также попытался использовать некоторую встроенную сборку с __asm {}, используя что-то вроде этого кода, и она насчитывает около 70 единиц времени, , но я не уверен что был создан правильный барьер памяти.

Изменить: время, указанное здесь, - это время, затрачиваемое на 2 потока для блокировки и разблокировки спин-блокировки в 1 000 000 раз.

Я знаю, что это не большая разница, но поскольку спин-блокировка - это сильно используемый объект, можно подумать, что программисты согласились бы на самый быстрый способ сделать спин-блокировку. Однако поиск в Google приводит к множеству различных подходов. Я бы подумал, что этот вышеупомянутый метод будет самым быстрым, если он будет реализован с использованием встроенной сборки и с использованием команды CMPXCHG8B вместо сравнения 32-разрядных регистров. Кроме того, необходимо учитывать барьеры памяти, это может быть сделано LOCK CMPXHG8B (я думаю?), что гарантирует "исключительные права" для общей памяти между ядрами. Наконец, [некоторые подсказывают], что для ожидания заняты должны сопровождаться NOP: REP, которые позволят процессорам Hyper-threading переключиться на другой поток, но я не уверен, это правда или нет?

Из моего теста производительности различных шпилек, видно, что нет большой разницы, но для чисто академических целей я хотел бы узнать, какой из них наиболее быстрый. Однако, поскольку у меня очень ограниченный опыт на языке ассемблера и с барьерами памяти, я был бы счастлив, если бы кто-нибудь смог написать код сборки для последнего примера, который я предоставил LOCK CMPXCHG8B и надлежащие барьеры памяти в следующий шаблон:

__asm
{
     spin_lock:
         ;locking code.
     spin_unlock:
         ;unlocking code.
}
4b9b3361

Ответ 1

Просто посмотрите здесь: x86 spinlock с помощью cmpxchg

И благодаря Кори Нельсон

__asm{
spin_lock:
xorl %ecx, %ecx
incl %ecx
spin_lock_retry:
xorl %eax, %eax
lock; cmpxchgl %ecx, (lock_addr)
jnz spin_lock_retry
ret

spin_unlock:
movl $0 (lock_addr)
ret
}

И еще один источник говорит: http://www.geoffchappell.com/studies/windows/km/cpu/cx8.htm

       lock    cmpxchg8b qword ptr [esi]
is replaceable with the following sequence

try:
        lock    bts dword ptr [edi],0
        jnb     acquired
wait:
        test    dword ptr [edi],1
        je      try
        pause                   ; if available
        jmp     wait

acquired:
        cmp     eax,[esi]
        jne     fail
        cmp     edx,[esi+4]
        je      exchange

fail:
        mov     eax,[esi]
        mov     edx,[esi+4]
        jmp     done

exchange:
        mov     [esi],ebx
        mov     [esi+4],ecx

done:
        mov     byte ptr [edi],0

А вот обсуждение блокировок и блокировок: http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2011-10/msg00009.html

Ответ 2

Хотя уже есть принятый ответ, есть несколько вещей, которые пропустили, что можно было бы использовать для улучшения всех ответов, взятых из этой статьи Intel, все вышеперечисленное быстрое выполнение блокировки:

  • Спрячьте на изменчивой прочитанной, а не на атомной инструкции, это позволяет избежать ненужной блокировки шины, особенно на сильно разрешенных блокировках.
  • Использовать откат для сильно оспариваемых блокировок
  • Встроить блокировку, желательно с встроенными для компиляторов, где inline asm вреден (в основном MSVC).

Ответ 3

В Википедии есть хорошая статья о spindlock, вот реализация x86

http://en.wikipedia.org/wiki/Spinlock#Example_implementation

Обратите внимание, что в их реализации не используется префикс "блокировка", поскольку он избыточен на x86 для инструкции "xchg" - он неявно имеет семантику блокировки, как обсуждалось в этом обсуждении в Stackoverflow:

В многоядерном x86, LOCK необходимо как префикс для XCHG?

REP: NOP является псевдонимом для инструкции PAUSE, вы можете узнать больше об этом здесь.

Как инструкция x86 pause работает в spinlock * и *, может ли она использоваться в других сценариях?

В вопросе о барьерах памяти здесь все, что вы, возможно, захотите узнать

Пределы памяти: аппаратное обеспечение для программных хакеров Paul E. McKenney

http://irl.cs.ucla.edu/~yingdi/paperreading/whymb.2010.06.07c.pdf

Ответ 4

Я, как правило, не один, чтобы схватить кого-то, стремящегося достичь быстрого кода: это обычно очень хорошее упражнение, которое приводит к лучшему пониманию программирования и более быстрого кода.

Я тоже не буду спорить, но я могу однозначно заявить, что вопрос о быстрой спин-блокировке 3 инструкции длинный или еще несколько - по крайней мере, в архитектуре x86 - бесполезная погоня.

Вот почему:

Вызов спин-блокировки с типичной кодовой последовательностью

lock_variable DW 0    ; 0 <=> free

mov ebx,offset lock_variable
mov eax,1
xchg eax,[ebx]

; if eax contains 0 (no one owned it) you own the lock,
; if eax contains 1 (someone already does) you don't

Освобождение спин-блокировки тривиально

mov ebx,offset lock_variable
mov dword ptr [ebx],0

Инструкция xchg поднимает контакт блокировки на процессоре, что фактически означает, что я хочу, чтобы шина находилась в течение следующих нескольких тактов. Этот сигнал прокладывает свой путь через кеши и вплоть до самого медленного устройства для шинирования, которое обычно является шиной PCI. Когда все устройства управления шиной завершены, сигнал блокировки (блокировки) посылается обратно. Затем происходит фактический обмен. Проблема в том, что последовательность lock/locka занимает ОЧЕНЬ долгое время. Шина PCI может работать на частоте 33 МГц с несколькими циклами задержки. На CPU с тактовой частотой 3,3 ГГц, что означает, что каждый цикл шины PCI занимает около ста циклов процессора.

Как правило, я предполагаю, что блокировка займет от 300 до 3000 циклов ЦП для завершения, и в конце концов я не знаю, смогу ли я иметь замок. Таким образом, несколько циклов, которые вы можете сохранить с помощью "быстрой" спин-блокировки, будут миражом, потому что никакой блокировки не будет похож на следующий, это будет зависеть от вашей ситуации в автобусе за это короткое время.

________________ ________________ EDIT

Я только что прочитал, что спин-блокировка - это "сильно используемый объект". Ну, вы, очевидно, не понимаете, что спин-блокировка потребляет огромное количество циклов процессора за время, когда оно вызывается. Или, говоря другим способом, каждый раз, когда вы его вызываете, вы теряете значительную часть своих возможностей обработки.

Трюк при использовании штифтов (или их более крупного брата, критического раздела) заключается в том, чтобы использовать их как можно более экономно, сохраняя при этом целевую программную функцию. В результате их использование повсеместно легко, и в результате вы получите тусклую производительность.

Это не все о написании быстрого кода, а также об организации ваших данных. Когда вы пишете "копирование небольших структур между потоками", вы должны понимать, что блокировка может занять сотни раз дольше, чем фактическое копирование.

________________ ________________ EDIT

Когда вы вычисляете среднее время блокировки, оно, вероятно, будет говорить очень мало, поскольку оно измеряется на вашем компьютере, что может не быть целью (которая может иметь совершенно разные характеристики использования шины). Для вашей машины среднее значение будет составлено из отдельных очень быстрых периодов времени (когда деятельность по освоению шины не мешала) вплоть до очень медленных времен (когда помехи, связанные с шинами, были значительными).

Вы можете ввести код, который определяет самые быстрые и медленные случаи и рассчитать коэффициент, чтобы увидеть, насколько сильно время спин-блокировки может меняться.

________________ ________________ EDIT

Обновление от мая 2016 года.

Питер Кордес выдвинул идею о том, что "настройка блокировки в неподтвержденном случае может иметь смысл", и время блокировки многих сотен тактовых циклов не происходит на современных процессорах, за исключением ситуаций, когда переменная блокировки смещена. Я начал задаваться вопросом, может ли моя предыдущая тестовая программа , написанная в 32-битном Watcom C, может быть затруднена WOW64, поскольку она была запущена в 64-разрядной ОС: Windows 7.

Итак, я написал 64-битную программу и скомпилировал ее с TDM gcc 5.3. Программа использует неявный вариант команды блокировки шины "XCHG r, m" для блокировки и простое назначение "MOV m, r" для разблокировки. В некоторых вариантах блокировки переменная блокировки была предварительно протестирована, чтобы определить, возможно ли даже пытаться заблокировать (используя простой сэмпл "CMP r, m", возможно, не задумываясь за пределами L3). Вот он:

// compiler flags used:

// -O1 -m64 -mthreads -mtune=k8 -march=k8 -fwhole-program -freorder-blocks -fschedule-insns -falign-functions=32 -g3 -Wall -c -fmessage-length=0

#define CLASSIC_BUS_LOCK
#define WHILE_PRETEST
//#define SINGLE_THREAD

typedef unsigned char      u1;
typedef unsigned short     u2;
typedef unsigned long      u4;
typedef unsigned int       ud;
typedef unsigned long long u8;
typedef   signed char      i1;
typedef          short     i2;
typedef          long      i4;
typedef          int       id;
typedef          long long i8;
typedef          float     f4;
typedef          double    f8;

#define usizeof(a) ((ud)sizeof(a))

#define LOOPS 25000000

#include <stdio.h>
#include <windows.h>

#ifndef bool
typedef signed char bool;
#endif

u8 CPU_rdtsc (void)
{
  ud tickl, tickh;
  __asm__ __volatile__("rdtsc":"=a"(tickl),"=d"(tickh));
  return ((u8)tickh << 32)|tickl;
}

volatile u8 bus_lock (volatile u8 * block, u8 value)
{
  __asm__ __volatile__( "xchgq %1,%0" : "=r" (value) : "m" (*block), "0" (value) : "memory");

  return value;
}

void bus_unlock (volatile u8 * block, u8 value)
{
  __asm__ __volatile__( "movq %0,%1" : "=r" (value) : "m" (*block), "0" (value) : "memory");
}

void rfence (void)
{
  __asm__ __volatile__( "lfence" : : : "memory");
}

void rwfence (void)
{
  __asm__ __volatile__( "mfence" : : : "memory");
}

void wfence (void)
{
  __asm__ __volatile__( "sfence" : : : "memory");
}

volatile bool LOCK_spinlockPreTestIfFree (const volatile u8 *lockVariablePointer)
{
  return (bool)(*lockVariablePointer == 0ull);
}

volatile bool LOCK_spinlockFailed (volatile u8 *lockVariablePointer)
{
  return (bool)(bus_lock (lockVariablePointer, 1ull) != 0ull);
}

void LOCK_spinlockLeave (volatile u8 *lockVariablePointer)
{
  *lockVariablePointer = 0ull;
}

static volatile u8 lockVariable = 0ull,
                   lockCounter =  0ull;

static volatile i8 threadHold = 1;

static u8 tstr[4][32];    /* 32*8=256 bytes for each thread parameters should result in them residing in different cache lines */

struct LOCKING_THREAD_STRUCTURE
{
  u8 numberOfFailures, numberOfPreTests;
  f8 clocksPerLock, failuresPerLock, preTestsPerLock;
  u8 threadId;
  HANDLE threadHandle;
  ud idx;
} *lts[4] = {(void *)tstr[0], (void *)tstr[1], (void *)tstr[2], (void *)tstr[3]};

DWORD WINAPI locking_thread (struct LOCKING_THREAD_STRUCTURE *ltsp)
{
  ud n = LOOPS;
  u8 clockCycles;

  SetThreadAffinityMask (ltsp->threadHandle, 1ull<<ltsp->idx);

  while (threadHold) {}

  clockCycles = CPU_rdtsc ();
  while (n)
  {
    Sleep (0);

#ifdef CLASSIC_BUS_LOCK
    while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
    while (1)
    {
      do
      {
        ++ltsp->numberOfPreTests;
      } while (!LOCK_spinlockPreTestIfFree (&lockVariable));

      if (!LOCK_spinlockFailed (&lockVariable)) break;
      ++ltsp->numberOfFailures;
    }
#else
    while (1)
    {
      ++ltsp->numberOfPreTests;
      if (LOCK_spinlockPreTestIfFree (&lockVariable))
      {
        if (!LOCK_spinlockFailed (&lockVariable)) break;
        ++ltsp->numberOfFailures;
      }
    }
#endif
#endif
    ++lockCounter;
    LOCK_spinlockLeave (&lockVariable);

#ifdef CLASSIC_BUS_LOCK
    while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
    while (1)
    {
      do
      {
        ++ltsp->numberOfPreTests;
      } while (!LOCK_spinlockPreTestIfFree (&lockVariable));

      if (!LOCK_spinlockFailed (&lockVariable)) break;
      ++ltsp->numberOfFailures;
    }
#else
    while (1)
    {
      ++ltsp->numberOfPreTests;
      if (LOCK_spinlockPreTestIfFree (&lockVariable))
      {
        if (!LOCK_spinlockFailed (&lockVariable)) break;
        ++ltsp->numberOfFailures;
      }
    }
#endif
#endif
    --lockCounter;
    LOCK_spinlockLeave (&lockVariable);

    n-=2;
  }
  clockCycles = CPU_rdtsc ()-clockCycles;

  ltsp->clocksPerLock =   (f8)clockCycles/           (f8)LOOPS;
  ltsp->failuresPerLock = (f8)ltsp->numberOfFailures/(f8)LOOPS;
  ltsp->preTestsPerLock = (f8)ltsp->numberOfPreTests/(f8)LOOPS;

//rwfence ();

  ltsp->idx = 4u;

  ExitThread (0);
  return 0;
}

int main (int argc, char *argv[])
{
  u8 processAffinityMask, systemAffinityMask;

  memset (tstr, 0u, usizeof(tstr));

  lts[0]->idx = 3;
  lts[1]->idx = 2;
  lts[2]->idx = 1;
  lts[3]->idx = 0;

  GetProcessAffinityMask (GetCurrentProcess(), &processAffinityMask, &systemAffinityMask);

  SetPriorityClass (GetCurrentProcess(), HIGH_PRIORITY_CLASS);
  SetThreadAffinityMask (GetCurrentThread (), 1ull);

  lts[0]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[0], 0, (void *)&lts[0]->threadId);
#ifndef SINGLE_THREAD
  lts[1]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[1], 0, (void *)&lts[1]->threadId);
  lts[2]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[2], 0, (void *)&lts[2]->threadId);
  lts[3]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[3], 0, (void *)&lts[3]->threadId);
#endif

  SetThreadAffinityMask (GetCurrentThread (), processAffinityMask);

  threadHold = 0;

#ifdef SINGLE_THREAD
  while (lts[0]->idx<4u) {Sleep (1);}
#else
  while (lts[0]->idx+lts[1]->idx+lts[2]->idx+lts[3]->idx<16u) {Sleep (1);}
#endif

  printf ("T0:%1.1f,%1.1f,%1.1f\n", lts[0]->clocksPerLock, lts[0]->failuresPerLock, lts[0]->preTestsPerLock);
  printf ("T1:%1.1f,%1.1f,%1.1f\n", lts[1]->clocksPerLock, lts[1]->failuresPerLock, lts[1]->preTestsPerLock);
  printf ("T2:%1.1f,%1.1f,%1.1f\n", lts[2]->clocksPerLock, lts[2]->failuresPerLock, lts[2]->preTestsPerLock);
  printf ("T3:%1.1f,%1.1f,%1.1f\n", lts[3]->clocksPerLock, lts[3]->failuresPerLock, lts[3]->preTestsPerLock);

  printf ("T*:%1.1f,%1.1f,%1.1f\n", (lts[0]->clocksPerLock+  lts[1]->clocksPerLock+  lts[2]->clocksPerLock+  lts[3]->clocksPerLock)/  4.,
                                    (lts[0]->failuresPerLock+lts[1]->failuresPerLock+lts[2]->failuresPerLock+lts[3]->failuresPerLock)/4.,
                                    (lts[0]->preTestsPerLock+lts[1]->preTestsPerLock+lts[2]->preTestsPerLock+lts[3]->preTestsPerLock)/4.);

  printf ("LC:%u\n", (ud)lockCounter);

  return 0;
}

Программа запускалась на компьютере под управлением DELL i5-4310U с DDR3-800, 2 ядрами /2 HT с 2,7 ГГц и общим кэшем L3.

Начнем с того, что влияние WOW64 было незначительным.

Один поток, выполняющий незащищенные блокировки/разблокировки, смог сделать это каждые 110 циклов. Настройка неуправляемой блокировки бесполезна: любой код, добавленный для улучшения одной инструкции XCHG, сделает ее медленнее.

С четырьмя HT, бомбардирующими переменную блокировки с попытками блокировки, ситуация радикально меняется. Время, необходимое для достижения успешной блокировки, переходит в 994 цикла, из которых значительная часть может быть отнесена к неудачным попыткам блокировки 2.2. Иными словами, в ситуации с высоким уровнем конкуренции в среднем 3.2 блокировки должны быть предприняты для достижения успешного блокирования. Очевидно, что 110 циклов не стали 110 * 3,2, но ближе к 110 * 9. Таким образом, здесь действуют другие механизмы, как и тесты на старой машине. Кроме того, в среднем 994 цикла охватывают диапазон между 716 и 1157

Варианты блокировки, реализующие предварительное тестирование, потребовали около 95% циклов, которые были запрошены с помощью простейшего варианта (XCHG). В среднем они выполнили бы 17 CMP, чтобы найти возможность выполнить 1.75 блокировок, из которых 1 был успешным. Я рекомендую использовать предварительное тестирование не только потому, что он быстрее: он накладывает меньше нагрузки на механизм блокировки шины (3.2-1.75 = 1.45 меньше попыток блокировки), хотя это немного увеличивает сложность.

Ответ 5

Просто спрашиваю:

Прежде чем вы выкопать эту глубину в прямую блокировку и почти блокированные структуры данных:

У вас - в ваших тестах и ​​вашем приложении - убедитесь, что конкурирующие потоки гарантированно работают на разных ядрах?

Если это не так, вы можете закончить программу, которая отлично работает на вашей машине разработки, но в ней сложна/неудачна, потому что один поток должен быть как раздевальщиком, так и разблокировщиком вашей спин-блокировки.

Чтобы дать вам рисунок: В Windows у вас есть стандартный срез времени 10 миллисекунд. Если вы не убедитесь, что в блокировку/разблокировку задействованы два физических потока, вы получите около 500 блокировок/разблокировок в секунду, и этот результат будет очень meh