Подтвердить что ты не робот

Почему Monitor.PulseAll приводит к шаблону задержки "ступенчатой ​​лестницы" в сигнальных потоках?

В библиотеке, использующей Monitor.PulseAll() для синхронизации потоков, я заметил, что латентность с момента появления PulseAll (...) вызывает время, когда поток проснулся, похоже, следует за распределением "ступенчатой ​​лестницы" - - с чрезвычайно большими шагами. Разбуженные нити почти не работают; и почти сразу возвращаются к ожиданию на мониторе. Например, на ящике с 12 ядрами с 24 потоками, ожидающими на мониторе (2x Xeon5680/Gulftown, 6 физических ядер на процессор, HT Disabled), латентность между импульсом и пробуждением нити такова:

Latency using Monitor.PulseAll(); 3rd party library

Первые 12 потоков (обратите внимание, что у нас есть 12 ядер) требуется от 30 до 60 микросекунд, чтобы ответить. Затем мы начинаем получать очень большие прыжки; с плато около 700, 1300, 1900 и 2600 микросекунд.

Я смог успешно воссоздать это поведение независимо от сторонней библиотеки, используя приведенный ниже код. Что делает этот код, так это запуск большого количества потоков (изменение параметра numThreads), которые просто ждут на мониторе, читают временную метку, регистрируют ее в ConcurrentSet, а затем сразу возвращаются к ожиданию. Как только второй PulseAll() просыпает все потоки. Он делает это 20 раз и сообщает о задержках для 10-й итерации консоли.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace PulseAllTest
{
    class Program
    {
        static long LastTimestamp;
        static long Iteration;
        static object SyncObj = new object();
        static Stopwatch s = new Stopwatch();
        static ConcurrentBag<Tuple<long, long>> IterationToTicks = new ConcurrentBag<Tuple<long, long>>();

        static void Main(string[] args)
        {
            long numThreads = 32;

            for (int i = 0; i < numThreads; ++i)
            {
                Task.Factory.StartNew(ReadLastTimestampAndPublish, TaskCreationOptions.LongRunning);
            }

            s.Start();
            for (int i = 0; i < 20; ++i)
            {
                lock (SyncObj)
                {
                    ++Iteration;
                    LastTimestamp = s.Elapsed.Ticks;
                    Monitor.PulseAll(SyncObj);
                }
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }

            Console.WriteLine(String.Join("\n",
                from n in IterationToTicks where n.Item1 == 10 orderby n.Item2 
                    select ((decimal)n.Item2)/TimeSpan.TicksPerMillisecond));
            Console.Read();
        }

        static void ReadLastTimestampAndPublish()
        {
            while(true)
            {
                lock(SyncObj)
                {
                    Monitor.Wait(SyncObj);
                }
                IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
            }
        }
    }
}

Используя приведенный выше код, приведен пример латентности в ящике с включенным hyperthreading с 8 ядер /w (т.е. 16 ядер в диспетчере задач) и 32 потока (* 2x Xeon5550/Gainestown, 4 физических ядра на процессор, HT Enabled ):

Latency using Monitor.PulseAll(), sample code

РЕДАКТИРОВАТЬ: Чтобы попытаться вывести NUMA из уравнения, ниже приведен график, на котором выполняется примерная программа с 16 потоками на Core i7-3770 (Ivy Bridge); 4 физических ядра; HT Включено:

Latency using Monitor.PulseAll(), sample code, no NUMA

Может ли кто-нибудь объяснить, почему Monitor.PulseAll() ведет себя таким образом?

EDIT2:

Чтобы попытаться показать, что это поведение не является неотъемлемой частью пробуждения кучи потоков одновременно, я реплицировал поведение тестовой программы с помощью событий; и вместо измерения латентности PulseAll() я измеряю латентность ManualResetEvent.Set(). Код создает несколько рабочих потоков, а затем ожидает событие ManualResetEvent.Set() на том же объекте ManualResetEvent. Когда событие запускается, они берут измерение задержки, а затем сразу же ждут своего собственного отдельного потока AutoResetEvent. До следующей итерации (до 500 мс) ManualResetEvent имеет значение Reset(), а затем каждый параметр AutoResetEvent - Set(), поэтому потоки могут вернуться к ожиданию совместно используемого ManualResetEvent.

Я не решался опубликовать это, потому что это может быть гигантский красный слух (я не делаю никаких претензий к событиям и мониторам, которые ведут себя аналогичным образом), плюс он использует некоторые абсолютно ужасные методы, чтобы заставить Событие вести себя как Монитор (я бы любил/ненавидел посмотрите, что сделают мои коллеги, если я представим это в обзор кода); но я думаю, что результаты являются просветляющими.

Этот тест проводился на той же машине, что и исходный тест; 2xXeon5680/Gulftown; 6 ядер на процессор (всего 12 ядер); Hyperthreading отключен.

ManualResetEventLatency

Если не ясно, насколько это радикально отличается от Monitor.PulseAll; вот первый граф, наложенный на последний граф:

ManualResetEventLatency vs. Monitor Latency

Код, используемый для создания этих измерений, приведен ниже:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace MRETest
{
    class Program
    {
        static long LastTimestamp;
        static long Iteration;
        static ManualResetEventSlim MRES = new ManualResetEventSlim(false);
        static List<ReadLastTimestampAndPublish> Publishers = 
            new List<ReadLastTimestampAndPublish>();
        static Stopwatch s = new Stopwatch();
        static ConcurrentBag<Tuple<long, long>> IterationToTicks = 
            new ConcurrentBag<Tuple<long, long>>();

        static void Main(string[] args)
        {
            long numThreads = 24;
            s.Start();

            for (int i = 0; i < numThreads; ++i)
            {
                AutoResetEvent ares = new AutoResetEvent(false);
                ReadLastTimestampAndPublish spinner = new ReadLastTimestampAndPublish(
                    new AutoResetEvent(false));
                Task.Factory.StartNew(spinner.Spin, TaskCreationOptions.LongRunning);
                Publishers.Add(spinner);
            }

            for (int i = 0; i < 20; ++i)
            {
                ++Iteration;
                LastTimestamp = s.Elapsed.Ticks;
                MRES.Set();
                Thread.Sleep(500);
                MRES.Reset();
                foreach (ReadLastTimestampAndPublish publisher in Publishers)
                {
                    publisher.ARES.Set();
                }
                Thread.Sleep(500);
            }

            Console.WriteLine(String.Join("\n",
                from n in IterationToTicks where n.Item1 == 10 orderby n.Item2
                    select ((decimal)n.Item2) / TimeSpan.TicksPerMillisecond));
            Console.Read();
        }

        class ReadLastTimestampAndPublish
        {
            public AutoResetEvent ARES { get; private set; }

            public ReadLastTimestampAndPublish(AutoResetEvent ares)
            {
                this.ARES = ares;
            }

            public void Spin()
            {
                while (true)
                {
                    MRES.Wait();
                    IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
                    ARES.WaitOne();
                }
            }
        }
    }
}
4b9b3361

Ответ 1

Одна разница между этими версиями заключается в том, что в случае PulseAll - потоки немедленно повторяют цикл, снова блокируя объект.

У вас есть 12 ядер, поэтому выполняется 12 потоков, выполняется цикл и снова вводится цикл, блокировка объекта (один за другим), а затем вход в состояние ожидания. Все это время ожидают другие потоки. В случае с ManualEvent у вас есть два события, поэтому потоки не сразу повторяют цикл, но вместо этого блокируются в событиях ARES - это позволяет другим потокам быстрее блокировать блокировку.

Я моделировал подобное поведение в PulseAll, добавляя спать в конце цикла в ReadLastTimestampAndPublish. Это позволяет другому потоку блокировать syncObj быстрее и, по-видимому, улучшать числа, которые я получаю от программы.

static void ReadLastTimestampAndPublish()
{
    while(true)
    {
        lock(SyncObj)
        {
            Monitor.Wait(SyncObj);
        }
        IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
        Thread.Sleep(TimeSpan.FromMilliseconds(100));   // <===
    }
}

Ответ 2

Чтобы начать, это не ответ, просто мои заметки, глядя на SSCLI, чтобы точно выяснить, что происходит. Большинство из них значительно выше моей головы, но тем не менее интересно.

Отключение кроличьей лунки начинается с вызова Monitor.PulseAll, который реализован в С#:

clr\src\bcl\system\threading\monitor.cs:

namespace System.Threading
{
    public static class Monitor 
    {
        // other methods omitted

        [MethodImplAttribute(MethodImplOptions.InternalCall)]
        private static extern void ObjPulseAll(Object obj);

        public static void PulseAll(Object obj)
        {
            if (obj==null) {
                throw new ArgumentNullException("obj");
            }

            ObjPulseAll(obj);
        } 
    }
}

Методы InternalCall маршрутизируются в clr\src\vm\ecall.cpp:

FCFuncStart(gMonitorFuncs)
    FCFuncElement("Enter", JIT_MonEnter)
    FCFuncElement("Exit", JIT_MonExit)
    FCFuncElement("TryEnterTimeout", JIT_MonTryEnter)
    FCFuncElement("ObjWait", ObjectNative::WaitTimeout)
    FCFuncElement("ObjPulse", ObjectNative::Pulse)
    FCFuncElement("ObjPulseAll", ObjectNative::PulseAll)
    FCFuncElement("ReliableEnter", JIT_MonReliableEnter)
FCFuncEnd()

ObjectNative живет в clr\src\vm\comobject.cpp:

FCIMPL1(void, ObjectNative::PulseAll, Object* pThisUNSAFE)
{
    CONTRACTL
    {
        MODE_COOPERATIVE;
        DISABLED(GC_TRIGGERS);  // can't use this in an FCALL because we're in forbid gc mode until we setup a H_M_F.
        THROWS;
        SO_TOLERANT;
    }
    CONTRACTL_END;

    OBJECTREF pThis = (OBJECTREF) pThisUNSAFE;
    HELPER_METHOD_FRAME_BEGIN_1(pThis);
    //-[autocvtpro]-------------------------------------------------------

    if (pThis == NULL)
        COMPlusThrow(kNullReferenceException, L"NullReference_This");

    pThis->PulseAll();

    //-[autocvtepi]-------------------------------------------------------
    HELPER_METHOD_FRAME_END();
}
FCIMPLEND

OBJECTREF - это какая-то магия, покрытая сверху Object (оператор -> перегружен), поэтому OBJECTREF->PulseAll() на самом деле Object->PulseAll(), который реализован в clr\src\vm\object.h и просто переадресовывает вызов на ObjHeader->PulseAll:

class Object
{
  // snip   
  public:
  // snip
    ObjHeader   *GetHeader()
    {
        LEAF_CONTRACT;
        return PTR_ObjHeader(PTR_HOST_TO_TADDR(this) - sizeof(ObjHeader));
    }
  // snip
    void PulseAll()
    {
        WRAPPER_CONTRACT;
        GetHeader()->PulseAll();
    }
  // snip
}

ObjHeader::PulseAll извлекает SyncBlock, который использует AwareLock для Enter ing и Exit блокировку объекта. AwareLock (clr\src\vm\syncblk.cpp) использует CLREvent (clr\src\vm\synch.cpp), созданный как MonitorEvent (CLREvent::CreateMonitorEvent(SIZE_T)), который вызывает UnsafeCreateEvent (clr\src\inc\unsafe.h) или методы синхронизации среды размещения.

clr\src\vm\syncblk.cpp:

void ObjHeader::PulseAll()
{
    CONTRACTL
    {
        INSTANCE_CHECK;
        THROWS;
        GC_TRIGGERS;
        MODE_ANY;
        INJECT_FAULT(COMPlusThrowOM(););
    }
    CONTRACTL_END;

    //  The following code may cause GC, so we must fetch the sync block from
    //  the object now in case it moves.
    SyncBlock *pSB = GetBaseObject()->GetSyncBlock();

    // GetSyncBlock throws on failure
    _ASSERTE(pSB != NULL);

    // make sure we own the crst
    if (!pSB->DoesCurrentThreadOwnMonitor())
        COMPlusThrow(kSynchronizationLockException);

    pSB->PulseAll();
}

void SyncBlock::PulseAll()
{
    CONTRACTL
    {
        INSTANCE_CHECK;
        NOTHROW;
        GC_NOTRIGGER;
        MODE_ANY;
    }
    CONTRACTL_END;

    WaitEventLink  *pWaitEventLink;

    while ((pWaitEventLink = ThreadQueue::DequeueThread(this)) != NULL)
        pWaitEventLink->m_EventWait->Set();
}

DequeueThread использует crst (clr\src\vm\crst.cpp), который является оберткой вокруг критических разделов. m_EventWait - это руководство CLREvent.

Итак, все это использует примитивы ОС, если хостинг-провайдер по умолчанию не переопределяет.