Подтвердить что ты не робот

Как гарантировать, что обновление элемента "ссылочный тип" в массиве видимо для других потоков?

private InstrumentInfo[] instrumentInfos = new InstrumentInfo[Constants.MAX_INSTRUMENTS_NUMBER_IN_SYSTEM];

public void SetInstrumentInfo(Instrument instrument, InstrumentInfo info)
{
    if (instrument == null || info == null)
    {
        return;
    }
    instrumentInfos[instrument.Id] = info;  // need to make it visible to other threads!
}

public InstrumentInfo GetInstrumentInfo(Instrument instrument)
{
    return instrumentInfos[instrument.Id];  // need to obtain fresh value!
}

SetInstrumentInfo и GetInstrumentInfo вызываются из разных потоков. InstrumentInfo является неизменным классом. Могу ли я получить самую последнюю копию при вызове GetInstrumentInfo? Боюсь, что я могу получить "кэшированную" копию. Должен ли я добавить некоторую синхронизацию?

Объявление instrumentInfos как volatile не помогло, потому что мне нужно объявлять элементы массива как volatile, а не сам массив.

У моего кода есть проблемы, и если да, то как его исправить?

UPD1:

Мне нужен мой код для работы в реальной жизни, чтобы он не соответствовал всем спецификациям! Так что, если мой код работает в реальной жизни, но не будет работать "теоретически" на каком-то компьютере в какой-то среде - это нормально!

  • Мне нужен мой код для работы на современном сервере X64 (в настоящее время 2 процессора HP DL360p Gen8) под Windows с использованием последней .NET Framework.
  • Мне не нужно работать с моим кодом под странными компьютерами или Mono или что-то еще
  • Я не хочу вводить латентность, поскольку это программное обеспечение HFT. Так как "реализация Microsoft использует сильную модель памяти для записи. Это означает, что записи обрабатываются так, как будто они были изменчивыми". Вероятно, мне не нужно добавлять дополнительные Thread.MemoryBarrier, которые не будут делать ничего, кроме добавления задержки. Я думаю, мы можем полагаться на то, что Microsoft продолжит использовать "сильную модель памяти" в будущих выпусках. По крайней мере, маловероятно, что Microsoft изменит модель памяти. Поэтому допустим, что этого не произойдет.

UPD2:

Самое последнее предложение - использовать Thread.MemoryBarrier();. Теперь я не понимаю точных мест, где я должен вставить его, чтобы моя программа работала в конфигурации стандартной (x64, Windows, Microsoft.NET 4.0). Помните, что я не хочу вставлять строки "просто для запуска вашей программы на IA64 или .NET 10.0". Для меня важнее скорость, чем переносимость. Однако было бы интересно также обновить мой код, чтобы он работал на любом компьютере.

UPD3

Решение .NET 4.5:

    public void SetInstrumentInfo(Instrument instrument, InstrumentInfo info)
    {
        if (instrument == null || info == null)
        {
            return;
        }
        Volatile.Write(ref instrumentInfos[instrument.Id], info);
    }

    public InstrumentInfo GetInstrumentInfo(Instrument instrument)
    {
        InstrumentInfo result = Volatile.Read(ref instrumentInfos[instrument.Id]);
        return result;
    }
4b9b3361

Ответ 1

Это вопрос с длинным и сложным ответом, но я постараюсь переделать его в какой-нибудь полезный совет.

1. Простое решение: только доступ к инструментамInfos под замком

Самый простой способ избежать непредсказуемости в многопоточных программах - всегда защищать совместное состояние с помощью блокировок.

Основываясь на ваших комментариях, похоже, что вы считаете это решение слишком дорогостоящим. Возможно, вы захотите дважды проверить это предположение, но если это действительно так, тогда рассмотрим остальные параметры.

2. Расширенное решение: Thread.MemoryBarrier

В качестве альтернативы вы можете использовать Thread.MemoryBarrier:

private InstrumentInfo[] instrumentInfos = new InstrumentInfo[Constants.MAX_INSTRUMENTS_NUMBER_IN_SYSTEM]; 

public void SetInstrumentInfo(Instrument instrument, InstrumentInfo info) 
{ 
    if (instrument == null || info == null) 
    { 
        return; 
    } 

    Thread.MemoryBarrier(); // Prevents an earlier write from getting reordered with the write below

    instrumentInfos[instrument.Id] = info;  // need to make it visible to other threads! 
} 

public InstrumentInfo GetInstrumentInfo(Instrument instrument) 
{ 
    InstrumentInfo info = instrumentInfos[instrument.Id];  // need to obtain fresh value! 
    Thread.MemoryBarrier(); // Prevents a later read from getting reordered with the read above
    return info;
}

Использование Thread.MemoryBarrier перед записью и после чтения предотвращает потенциальную проблему. Первый барьер памяти предотвращает переписывание записи в записи записи, которая инициализирует поле объекта записью, которая публикует объект в массиве, а второй барьер памяти препятствует тому, чтобы поток чтения переупорядочивал чтение, которое принимает объект из массива с любым последующие чтения полей этого объекта.

В качестве дополнительной заметки .NET 4 также предоставляет Thread.VolatileRead и Thread.VolatileWrite, которые используют Thread.MemoryBarrier, как показано выше. Тем не менее, нет перегрузки Thread.VolatileRead и Thread.VolatileWrite для ссылочных типов, отличных от System.Object.

3. Расширенное решение (.NET 4.5): Volatile.Read и Volatile.Write

.NET 4.5 предоставляет методы Volatile.Read и Volatile.Write, которые более эффективны, чем полные барьеры памяти. Если вы настроили таргетинг на .NET 4, этот параметр не поможет.

4. Решение "Неправильно, но будет работать"

Вы никогда не должны полагаться на то, что я собираюсь сказать. Но... маловероятно, что вы сможете воспроизвести проблему, которая присутствует в вашем исходном коде.

Фактически, на X64 в .NET 4 я был бы очень удивлен, если бы вы когда-либо могли его воспроизвести. X86-X64 обеспечивает довольно сильные гарантии переупорядочения памяти, и поэтому эти шаблоны публикации работают правильно. Компилятор .NET 4 С# и компилятор .NET 4 CLR JIT также избегают оптимизаций, которые могут нарушить ваш шаблон. Таким образом, ни один из трех компонентов, которым разрешено изменять порядок операций с памятью, не будет.

Тем не менее, существуют (несколько неясные) варианты шаблона публикации, которые фактически не работают на .NET 4 в X64. Таким образом, даже если вы считаете, что код никогда не понадобится запускать на любой архитектуре, отличной от .NET 4 X64, ваш код будет более ремонтопригодным, если вы используете один из правильных подходов, хотя проблема на данный момент не воспроизводится на вашем сервере.

Ответ 2

Мой предпочтительный способ разрешить это с критическим разделом. С# имеет встроенную конструкцию языка под названием "Lock", которая решает эту проблему. Оператор lock гарантирует, что не более одного потока может находиться внутри этого критического раздела одновременно.

private InstrumentInfo[] instrumentInfos = new InstrumentInfo[Constants.MAX_INSTRUMENTS_NUMBER_IN_SYSTEM];

public void SetInstrumentInfo(Instrument instrument, InstrumentInfo info)
{
    if (instrument == null || info == null) {
        return;
    }
    lock (instrumentInfos) {
        instrumentInfos[instrument.Id] = info;
    }
}

public InstrumentInfo GetInstrumentInfo(Instrument instrument)
{
    lock (instrumentInfos) {
        return instrumentInfos[instrument.Id];
    }
}

Это имеет последствия для производительности, но всегда гарантирует, что вы получите надежный результат. Это особенно полезно, если вы когда-либо перебирали объект instrumentInfos; вам абсолютно необходим оператор lock для такого кода.

Обратите внимание, что lock - это решение общего назначения, которое гарантирует, что любые сложные операторы выполняются надежно и атомарно. В вашем случае, поскольку вы устанавливаете указатель на объект, вы можете обнаружить, что можно использовать более простую конструкцию, такую ​​как поточно-безопасный список или массив, при условии, что это единственные две функции, которые когда-либо касаются instrumentInfos. Более подробную информацию можно найти здесь: Безопасны ли потоки массивов С#?

EDIT: ключевой вопрос о вашем коде: что такое InstrumentInfo? Если он получен из object, вам нужно быть осторожным, чтобы всегда создавать новый объект InstrumentInfo каждый раз, когда вы обновляете общий массив, поскольку обновление объекта в отдельном потоке может вызвать проблемы. Если это struct, оператор lock предоставит всю необходимую вам безопасность, так как значения внутри него будут скопированы при записи и чтении.

РЕДАКТИРОВАТЬ 2: Из немного большего количества исследований выяснилось, что существуют некоторые случаи, когда изменения в общей переменной потока не отображаются в некоторых конфигурациях компилятора. В этом разделе обсуждается случай, когда значение "bool" может быть задано в одном потоке и никогда не извлекается на другом: Может ли поток С# кэшировать значение и игнорировать изменения этого значения на другом темы?

Однако я заметил, что этот случай существует только там, где используется тип значения .NET. Если вы посмотрите на ответ Джо Эриксона. декомпиляция этого конкретного кода показывает, почему: разделяемое потоком значение считывается в регистр, а затем никогда не перечитывается, поскольку компилятор оптимизирует ненужное чтение в цикле.

Учитывая, что вы используете массив общих объектов, и учитывая, что вы инкапсулировали все аксессоры, я думаю, что вы совершенно безопасны, используя свои необработанные методы. ОДНАКО: потеря производительности при использовании блокировки настолько мала, что практически забывается. Например, я написал простое тестовое приложение, которое пыталось вызвать SetInstrumentInfo и GetInstrumentInfo 10 миллионов раз. Результаты работы были следующими:

  • Без блокировки 10 миллионов вызовов и вызовов: 2 секунды 149 мс
  • С блокировкой, 10 миллионов установлены и получают вызовы: 2 секунды 195 мс

Это означает грубую производительность примерно:

  • Без блокировки вы можете выполнить 4653327 получать и устанавливать вызовы в секунду.
  • С блокировкой вы можете выполнить 4555808 получать и устанавливать вызовы в секунду.

С моей точки зрения, оператор lock() стоит 2,1% компрометации производительности. По моему опыту, методы get и set, подобные этим, занимают всего лишь мельчайшую часть времени выполнения приложения. Вероятно, вам лучше всего искать потенциальную оптимизацию в другом месте или потратить дополнительные деньги, чтобы получить более высокий тактовый процессор.

Ответ 3

Вы можете использовать примитив синхронизации на системном уровне (например, Mutex); но это немного тяжело. Мьютекс очень медленный, потому что это примитив на уровне ядра. Это означает, что вы можете иметь взаимоисключающий код между процессами. Это вам не нужно, и вы можете использовать что-то гораздо менее дорогостоящее в производительности, например, lock или Monitor.Enter или Monitor.Exit, которое работает в рамках одного процесса.

Вы не можете использовать что-то вроде VolatileRead/Write или MemoryBarrier, потому что вам нужно сделать акт записи для атома коллекции, если элементы в массиве не назначаются атомарно. VolatileRead/Write или MemoryBarrier не делает этого. Это просто дает вам возможность приобретать и выпускать семантику. Это означает, что все, что написано, является "видимым" для других потоков. Если вы не сделаете запись в атомную коллекцию, вы можете испортить семантику данных - получение/выпуск не поможет.

например:.

private InstrumentInfo[] instrumentInfos = new InstrumentInfo[Constants.MAX_INSTRUMENTS_NUMBER_IN_SYSTEM];

private readonly object locker = new object();

public void SetInstrumentInfo(Instrument instrument, InstrumentInfo info)
{
    if (instrument == null || info == null)
    {
        return;
    }
    lock(locker)
    {
        instrumentInfos[instrument.Id] = info;
    }
}

public InstrumentInfo GetInstrumentInfo(Instrument instrument)
{
    lock(locker)
    {
        return instrumentInfos[instrument.Id];
    }
}

Параллельная коллекция будет делать то же самое; но вы несете затраты, потому что каждая операция охраняется синхронизацией. Кроме того, параллельная коллекция знает только об атомарности доступа к элементам, вам все равно нужно обеспечить атомарность на уровне ваших приложений: если вы сделали следующее с одновременным сбором:

public bool Contains(Instrument instrument)
{
   foreach(var element in instrumentInfos)
   {
       if(element == instrument) return true;
   }
}

... у вас будет хотя бы пара проблем. Во-первых, вы не останавливаете SetInstrumentInfo от изменения коллекции во время ее перечисления (многие коллекции не поддерживают это, а бросают и исключают). т.е. сбор только "охраняется" при извлечении одного элемента. Во-вторых, коллекция хранится на каждой итерации. Если у вас есть 100 элементов, а параллельная коллекция использует блокировку, вы получаете 100 блокировок (предполагая, что элемент для поиска последний или вообще не найден). Это будет намного медленнее, чем нужно. Если вы не используете параллельную коллекцию, вы можете просто использовать блокировку, чтобы получить тот же результат:

public bool Contains(Instrument instrument)
{
   lock(locker)
   {
      foreach(var element in instrumentInfos)
      {
          if(element == instrument) return true;
      }
   }
}

и иметь один замок и быть намного более результативным.

UPDATE Я думаю, важно отметить, что если InstrumentInfo является struct, использование lock (или другого примитива синхронизации) становится еще более важным потому что он будет иметь семантику значений, и для каждого присваивания потребуется переместить столько байтов, сколько используется InstrumentInfo для хранения своих полей (т.е. это уже не 32-битное или 64-битное назначение ссылки на родное слово). т.е. простое назначение InstrumentInfo (например, к элементу массива) никогда не будет атомарным и, следовательно, не является потокобезопасным.

ОБНОВЛЕНИЕ 2 Если операция для замены элемента массива является потенциально атомарной (становится просто ссылкой, если InstrumentInfo является ссылкой, а инструкция IL stelem.ref является атомарной). (т.е. акт записи элемента в массив является атомарным w.r.t., о чем я упоминал выше). В этом случае, если только код, который имеет дело с instrumentInfos, является тем, что вы разместили, вы можете использовать Thread.MemoryBarrier():

public void SetInstrumentInfo(Instrument instrument, InstrumentInfo info)
{
    if (instrument == null || info == null)
    {
        return;
    }
    Thread.MemoryBarrier();
    instrumentInfos[instrument.Id] = info;
}

public InstrumentInfo GetInstrumentInfo(Instrument instrument)
{
    var result = instrumentInfos[instrument.Id];
    Thread.MemoryBarrier();
    return result;
}

..., что было бы эквивалентно объявлению каждого элемента в массиве volatile, если бы это было возможно.

VolatileWrite не будет работать, потому что он требует ссылки на переменную, на которую он будет писать, вы не можете дать ему ссылку на элемент массива.

Ответ 4

Насколько я знаю из моих лет программирования драйверов устройств, volatile используется для вещей, которые могут меняться вне контроля процессора, то есть с помощью аппаратного вмешательства или для аппаратной памяти, отображаемой в системное пространство (CSR и т.д.), Когда поток обновляет местоположение памяти, CPU блокирует линию кэша и вызывает прерывание между процессорами, чтобы другие CPU могли его отбросить. Таким образом, нет никаких шансов, что вы прочитаете устаревшие данные. Теоретически вы можете беспокоиться только о параллельной записи и чтениях в одну и ту же позицию массива, если данные не являются атомарными (несколько квадов), так как читатель может читать частично обновленные данные. Я думаю, что это не может произойти в массиве ссылок.

Я буду думать о том, чего вы пытаетесь достичь, потому что он похож на приложение + драйвер, который я разработал в прошлом, для отображения потокового видео с панели тестеров для мобильных телефонов. Приложение видеодисплея может применять некоторые основные манипуляции с изображениями на каждом кадре (баланс белого, пиксели смещения и т.д.). Эти настройки были SET из пользовательского интерфейса и GET из потоков обработки. С точки зрения обрабатывающего потока настройки были неизменными. Похоже, вы пытаетесь сделать что-то подобное со звуком какого-то типа вместо видео.

Подход, который я использовал в моем приложении на С++, заключался в том, чтобы скопировать текущие настройки в структуру, которая сопровождала "фрейм". Таким образом, каждый кадр имел свою собственную копию настроек, которые будут применены к нему. В потоке пользовательского интерфейса была сделана блокировка для записи изменений в настройках, а потоки обработки сделали блокировку для копирования настроек. Блокировка требовалась, потому что несколько квадов были перемещены, и без блокировки у нас есть уверенность в чтении частично обновленных настроек. Не то чтобы это имело бы большое значение, так как, вероятно, никто не заметил бы фальшивый фрейм во время потоковой передачи, но если они приостановили видео или сохранили кадр на диск, тогда они наверняка заметят блестящий зеленый пиксель среди темной стены. В случае звука глюки гораздо легче обнаружить даже во время пропаривания.

Это был случай один. Посмотрим на два случая.

Как вы делаете радикальную реконфигурацию устройства, пока устройство используется неизвестным количеством потоков? Если вы просто сделаете это и сделаете это, то вам гарантировано, что несколько потоков начнутся с конфигурации A и в процессе столкнется с конфигурацией B, которая с высокой степенью уверенности означает смерть. Это то, где вам нужны вещи, такие как блокировки чтения-записи. То есть, примитив синхронизации, который позволит вам дождаться завершения текущей активности, блокируя новую активность. Это суть блокировки чтения-записи.

Конец второго дела. Теперь посмотрим, в чем ваши проблемы, если, конечно, моя дикая догадка правильная.

Если ваши рабочие потоки выполняют GET в начале цикла обработки и удерживаются на этой ссылке для всего цикла, тогда вам не нужны блокировки, поскольку эталонные обновления являются атомарными, как упоминал Питер. Это реализация, которую я вам предлагаю, и это эквивалентно моему копированию структуры настроек в начале обработки каждого кадра.

Однако, если вы делаете несколькоGET из всего кода, тогда у вас проблемы, потому что в течение одного цикла обработки некоторые вызовы возвращают ссылку A, а некоторые вызовы возвращают ссылку B. Если это ОК с вашим приложением, то вы счастливый парень.

Если, однако, у вас возникла проблема с этой проблемой, вам нужно либо исправить ошибку, либо попытаться ее исправить, построив ее поверх нее. Исправить ошибку просто: устранить несколько GET, даже если это стоит вам второстепенной перезаписи, чтобы вы могли передать ссылку.

Если вы хотите исправить ошибку, используйте блокировки Reader-Writer. Каждый поток получает блокировку считывателя, выполняет GET, а затем удерживается на замке до завершения цикла. Затем он освобождает блокировку считывателя. Нить, которая обновляет массив, получает блокировку записи, делает SET и немедленно освобождает блокировку записи.

Решение почти работает, но оно воняет, когда код хранит блокировки в течение длительного периода времени, даже если он считывает блокировку.

Однако, даже если мы забываем о многопоточном программировании, существует еще более тонкая проблема. Поток получает блокировку чтения в начале цикла, отпускает его в конце, затем запускает новый цикл и повторно получает его. Когда появляется сценарий, нить не может начать свой новый цикл до тех пор, пока не будет выполнена запись, что означает, что все остальные потоки читателей, которые работают сейчас, выполняются с их обработкой. Это означает, что некоторые потоки будут испытывать неожиданно большие задержки перед перезапуском своего цикла, потому что даже если они являются потоками с высоким приоритетом, они блокируются запросом блокировки записи, пока все текущие считыватели также не закончат свой цикл. Если вы делаете звук или что-то еще, что критично для времени, тогда вы можете избежать такого поведения.

Надеюсь, я догадался, и мои объяснения были ясными: -)

Ответ 5

По какой-то причине, у меня нет времени, чтобы узнать, я не мог добавлять комментарии нигде, поэтому я добавил еще один ответ. Приносим извинения за неудобства.

UPD3 работает, конечно, но опасно вводит в заблуждение.

Если на самом деле требуется решение UPD3, подумайте о следующем сценарии: ThreadA запускается на CPU1 и выполняет атомное обновление Var1. Затем ThreadA получает выгрузку, и планировщик решает перепланировать его на CPU2. Ooopss... Так как Var1 находится в кеше CPU1, то, согласно полностью ошибочной логике UPD3, ThreadA должен был сделать volatile write в Var1, а затем выполнить изменчивое чтение. Oooopsss... Нить никогда не знает, когда он будет перенесен или на каком процессоре он закончится. Таким образом, в соответствии с полностью ошибочной логикой UPD3, поток должен всегда выполнять волатильные записи и изменчивые чтения, иначе он мог бы считывать устаревшие данные. Это означает, что все потоки должны постоянно обновлять/записывать.

Для одного потока проблема будет еще хуже при обновлении многобайтовой структуры (неатомное обновление). Представьте себе, что некоторые обновления произошли на CPU1, некоторые на CPU2, некоторые на CPU3 и т.д.

Это меня заставляет задуматься, почему мир еще не пришел к концу.

Ответ 6

В качестве альтернативы вы можете использовать некоторые из классов в пространстве имен System.Collections.Concurrent, таких как класс ConcurrentBag<T>. Эти классы построены так, чтобы быть потокобезопасными.

http://msdn.microsoft.com/en-us/library/system.collections.concurrent.aspx

Ответ 7

В этом случае вы можете использовать ConcurrentDictionary. Это безопасный поток.

Ответ 8

Очень интересный вопрос. Надеюсь, что Эрик или Джон придут, чтобы установить рекорд, но до тех пор позвольте мне попробовать все возможное. Кроме того, позвольте мне префикс этого, сказав, что я не уверен, что этот ответ правильный.

Обычно, имея дело с "я делаю это поле неустойчивым?" вопросы, вы беспокоитесь о таких ситуациях:

SomeClass foo = new SomeClass("foo");

void Thread1Method() {
    foo = new SomeClass("Bar");
    ....
}

void Thread2Method() {
    if (foo.Name == "foo") ...
    ...
    // Here, the old value of foo might be cached 
    // even if Thread1Method has already updated it.
    // Making foo volatile will fix that.
    if (foo.Name == "bar") ...
}

В вашем случае вы не меняете адрес вашего общего массива, но вы меняете значение элемента внутри этого массива, который является управляемым указателем (предположительно) размера родного слова. Эти изменения являются атомарными, но я не уверен, что у них гарантированно есть барьер памяти (семантика получения/выпуска, в соответствии с спецификацией С#) и сброс самого массива. Поэтому я бы предложил добавить volatile в самом массиве.

Если сам массив неустойчив, я не думаю, что поток чтения может кэшировать элемент из массива. Кажется, вам не нужно, чтобы элементы были волатильны, потому что вы не изменяете элементы на месте в массиве - вы просто заменяете их новыми элементами (по крайней мере, взглядами вещей!).

Здесь вы можете столкнуться с проблемой:

var myTrumpet = new Instrument { Id = 5 };
var trumpetInfo = GetInstrumentInfo(myTrumpet);
trumpetInfo.Name = "Trumpet";
SetInstrumentInfo(myTrumpet, trumpetInfo);

Здесь вы обновляете элемент на месте, поэтому я не был бы удивлен, увидев, что происходит кеширование. Я не уверен, действительно ли это будет. MSIL содержит явные коды операций для чтения и записи элементов массива, поэтому семантика может отличаться от стандартных кучных чтений и записи.