Подтвердить что ты не робот

Механизм синхронизации наблюдаемого объекта

Предположим, нам нужно синхронизировать доступ для чтения/записи к общим ресурсам. Несколько потоков будут обращаться к этому ресурсу как при чтении, так и в записи (чаще всего для чтения, иногда для записи). Предположим также, что каждая запись всегда вызывает операцию чтения (объект является наблюдаемым).

В этом примере я представлю себе такой класс (простить синтаксис и стиль, это просто для иллюстрации):

class Container {
    public ObservableCollection<Operand> Operands;
    public ObservableCollection<Result> Results;
}

Я пытаюсь использовать ReadWriterLockSlim для этой цели, кроме того, я бы поставил его на уровне Container (представьте, что объект не так прост, и одна операция чтения/записи может включать несколько объектов):

public ReadWriterLockSlim Lock;

Реализация Operand и Result не имеет смысла для этого примера. Теперь представьте себе какой-то код, который соблюдает Operands и произведет результат, чтобы вставить Results:

void AddNewOperand(Operand operand) {
    try {
        _container.Lock.EnterWriteLock();
        _container.Operands.Add(operand);
    }
    finally {
        _container.ExitReadLock();
    }
}

Наш гипотонический наблюдатель сделает что-то подобное, но будет потреблять новый элемент, и он будет блокироваться с помощью EnterReadLock() для получения операндов, а затем EnterWriteLock() для добавления результата (позвольте мне пропустить код для этого). Это приведет к исключению из-за рекурсии, но если я установил LockRecursionPolicy.SupportsRecursion, тогда я просто открою свой код для мертвых блокировок (из MSDN):

По умолчанию новые экземпляры ReaderWriterLockSlim создаются с помощью флага LockRecursionPolicy.NoRecursion и не позволяют рекурсии. Эта политика по умолчанию рекомендуется для всех новых разработок, потому что рекурсия вводит ненужные сложности, а делает ваш код более склонным к взаимоблокировкам.

Я повторяю соответствующую часть для ясности:

Рекурсия [...] делает ваш код более склонным к взаимоблокировкам.

Если я не ошибаюсь в LockRecursionPolicy.SupportsRecursion, если из того же потока я спрашиваю, допустим, блокировку чтения, тогда кто-то еще спрашивает о блокировке записи, тогда у меня будет мертвый замок, а то, что говорит MSDN, имеет смысл. Более того, рекурсия также ухудшит производительность также измеримым образом (и это не то, что я хочу, если я использую ReadWriterLockSlim вместо ReadWriterLock или Monitor).

Вопрос (ы)

Наконец, мои вопросы (обратите внимание, что я не ищу обсуждения общих механизмов синхронизации, я бы знал, что неправильно для этого сценария производителя/наблюдаемого/наблюдателя):

  • Что лучше в этой ситуации? Чтобы избежать ReadWriterLockSlim в пользу Monitor (даже если в реальном мире чтение кода будет намного больше, чем пишет)?
  • Откажитесь от такой грубой синхронизации? Это может даже повысить производительность, но это сделает код намного сложнее (конечно, не в этом примере, а в реальном мире).
  • Должен ли я просто делать уведомления (из наблюдаемой коллекции) асинхронными?
  • Что-то еще я не вижу?

Я знаю, что нет лучшего механизма синхронизации, поэтому инструмент, который мы используем, должен быть правильным для нашего случая, но мне интересно, есть ли какая-то лучшая практика или я просто игнорирую что-то очень важное между потоками и наблюдателями (представьте, что используйте Microsoft Reactive Extensions, но вопрос является общим, не привязанным к этой структуре).

Возможные решения?

Я бы попытался сделать события (как-то) отложенными:

1-е решение
Каждое изменение не запускает событие CollectionChanged, оно хранится в очереди. Когда поставщик (объект, который выталкивает данные) завершил, он вручную заставит очередь быть сброшенной (последовательно поднимая каждое событие). Это может быть сделано в другом потоке или даже в потоке вызывающего (но вне блокировки).

Он может работать, но он сделает все менее "автоматическим" (каждое уведомление об изменении должно запускаться вручную самим производителем, больше кода для записи, больше ошибок).

2-е решение
Другое решение может заключаться в предоставлении ссылки на наш замок на наблюдаемую коллекцию. Если я обернул ReadWriterLockSlim в пользовательский объект (полезно скрыть его в удобном для использования объекте IDisposable), я могу добавить ManualResetEvent, чтобы уведомить, что все блокировки были выпущены таким образом, сама коллекция может вызывать события (снова в том же потоке или в другом потоке).

3-е решение
Еще одна идея - просто сделать события асинхронными. Если обработчику событий потребуется блокировка, он будет остановлен, чтобы подождать его временного интервала. Для этого я беспокоюсь о большом количестве потоков, которое может быть использовано (особенно если из пула потоков).

Честно говоря, я не знаю, применимо ли какое-либо из них в приложении реального мира (лично - с точки зрения пользователей - я предпочитаю второй вариант, но он подразумевает коллекцию для всего, и он делает коллекцию осведомленной о потоковом режиме, и я бы избегал это, если возможно). Я не хотел бы делать код более сложным, чем необходимо.

4b9b3361

Ответ 1

Это похоже на многопоточную рассол. Очень сложно работать с рекурсией в этой цепочке цепочек событий, в то же время избегая взаимоблокировок. Возможно, вы захотите рассмотреть проблему вокруг проблемы.

Например, вы можете сделать добавление операнда асинхронным к началу события:

private readonly BlockingCollection<Operand> _additions
    = new BlockingCollection<Operand>();

public void AddNewOperand(Operand operand)
{
    _additions.Add(operand);
}

И тогда фактическое добавление произойдет в фоновом потоке:

private void ProcessAdditions()
{
    foreach(var operand in _additions.GetConsumingEnumerable())
    {
        _container.Lock.EnterWriteLock();
        _container.Operands.Add(operand);
        _container.Lock.ExitWriteLock();
    }
}

public void Initialize()
{
    var pump = new Thread(ProcessAdditions)
    {
        Name = "Operand Additions Pump"
    };
    pump.Start();
}

Это разделение жертвует некоторой последовательностью - код, запущенный после того, как метод add фактически не узнает, когда добавление действительно произошло, и, возможно, это проблема для вашего кода. Если это так, это можно переписать для подписки на наблюдение и использовать Task для сигнала, когда добавление завершается:

public Task AddNewOperandAsync(Operand operand)
{
    var tcs = new TaskCompletionSource<byte>();

    // Compose an event handler for the completion of this task
    NotifyCollectionChangedEventHandler onChanged = null;
    onChanged = (sender, e) =>
    {
        // Is this the event for the operand we have added?
        if (e.NewItems.Contains(operand))
        {
            // Complete the task.
            tcs.SetCompleted(0);

            // Remove the event-handler.
            _container.Operands.CollectionChanged -= onChanged;
        }
    }

    // Hook in the handler.
    _container.Operands.CollectionChanged += onChanged;

    // Perform the addition.
    _additions.Add(operand);

    // Return the task to be awaited.
    return tcs.Task;
}

Логика обработчика событий поднимается в фоновом потоке, накачивая добавляемые сообщения, поэтому нет возможности блокировать потоки переднего плана. Если вы ожидаете добавления в сообщении-насоса для окна, контекст синхронизации достаточно умен, чтобы также планировать продолжение в потоке сообщений-сообщений.

Если вы идете по маршруту Task или нет, эта стратегия означает, что вы можете безопасно добавлять больше операндов из наблюдаемого события без повторного ввода каких-либо блокировок.

Ответ 2

Я не уверен, что это точно такая же проблема, но при работе с относительно небольшими объемами данных (записи 2k-3k) я использовал приведенный ниже код, чтобы облегчить доступ к перекрестному потоку для чтения/записи для коллекций, привязанных к UI, Этот код изначально нашел здесь.

public class BaseObservableCollection<T> : ObservableCollection<T>
{
  // Constructors
  public BaseObservableCollection() : base() { }
  public BaseObservableCollection(IEnumerable<T> items) : base(items) { }
  public BaseObservableCollection(List<T> items) : base(items) { }

  // Evnet
  public override event NotifyCollectionChangedEventHandler CollectionChanged;

  // Event Handler
  protected override void OnCollectionChanged(NotifyCollectionChangedEventArgs e)
  {
    // Be nice - use BlockReentrancy like MSDN said
    using (BlockReentrancy())
    {
      if (CollectionChanged != null)
      {
        // Walk thru invocation list
        foreach (NotifyCollectionChangedEventHandler handler in CollectionChanged.GetInvocationList())
        {
          DispatcherObject dispatcherObject = handler.Target as DispatcherObject;

          // If the subscriber is a DispatcherObject and different thread
          if (dispatcherObject != null && dispatcherObject.CheckAccess() == false)
          {
            // Invoke handler in the target dispatcher thread
            dispatcherObject.Dispatcher.Invoke(DispatcherPriority.DataBind, handler, this, e);
          }
          else
          {
            // Execute handler as is
            handler(this, e);
          }
        }
      }
    }
  }
}

Я также использовал приведенный ниже код (который наследуется от приведенного выше кода) для поддержки создания события CollectionChanged, когда элементы внутри коллекции поднимают PropertyChanged.

public class BaseViewableCollection<T> : BaseObservableCollection<T>
  where T : INotifyPropertyChanged
{
  // Constructors
  public BaseViewableCollection() : base() { }
  public BaseViewableCollection(IEnumerable<T> items) : base(items) { }
  public BaseViewableCollection(List<T> items) : base(items) { }

  // Event Handlers
  private void ItemPropertyChanged(object sender, PropertyChangedEventArgs e)
  {
    var arg = new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Replace, sender, sender);
    base.OnCollectionChanged(arg);
  }

  protected override void ClearItems()
  {
    foreach (T item in Items) { if (item != null) { item.PropertyChanged -= ItemPropertyChanged; } }
    base.ClearItems();
  }

  protected override void InsertItem(int index, T item)
  {
    if (item != null) { item.PropertyChanged += ItemPropertyChanged; }
    base.InsertItem(index, item);
  }

  protected override void RemoveItem(int index)
  {
    if (Items[index] != null) { Items[index].PropertyChanged -= ItemPropertyChanged; }
    base.RemoveItem(index);
  }

  protected override void SetItem(int index, T item)
  {
    if (item != null) { item.PropertyChanged += ItemPropertyChanged; }
    base.SetItem(index, item);
  }
}

Ответ 3

Синхронизация коллекции кросс-потоков

Привязка привязки ListBox к ObservableCollection, когда данные изменяются, вы обновляете ListBox, потому что INotifyCollectionChanged реализован. Дефект dell'ObservableCollection заключается в том, что данные могут быть изменены только потоком, который его создал.

SynchronizedCollection не имеет проблемы с Multi-Thread, но не обновляет ListBox, потому что он не реализован INotifyCollectionChanged, даже если вы реализуете INotifyCollectionChanged, CollectionChanged (this, e) можно вызвать только из потока, который его создал.. поэтому он не работает.

Заключение

-Если вы хотите, чтобы список, содержащий автономный монопоток, использовал ObservableCollection

-Если вы хотите, чтобы список не был аутентифицирован, но многопоточным, используйте SynchronizedCollection

- Если вы хотите оба, используйте Framework 4.5, BindingOperations.EnableCollectionSynchronization и ObservableCollection() следующим образом:

/ / Creates the lock object somewhere
private static object _lock = new object () ;
...
/ / Enable the cross acces to this collection elsewhere
BindingOperations.EnableCollectionSynchronization ( _persons , _lock )

Полный образец http://10rem.net/blog/2012/01/20/wpf-45-cross-thread-collection-synchronization-redux