Подтвердить что ты не робот

Создание слабой подписки на IObservable

Что я хочу сделать, так это убедиться, что если единственная ссылка на моего наблюдателя является наблюдаемой, она собирает мусор и перестает получать сообщения.

Скажем, у меня есть элемент управления, в котором есть список, называемый "Сообщения" и этот код:

//Short lived display of messages (only while the user viewing incoming messages)
public partial class MessageDisplay : UserControl
{
    public MessageDisplay()
    {
        InitializeComponent();
        MySource.IncomingMessages.Subscribe(m => Messages.Items.Add(m));
    }
}

Что подключается к этому источнику:

//Long lived location for message store
static class MySource
{
    public readonly static IObservable<string> IncomingMessages = new ReplaySubject<string>;
}

Я не хочу, чтобы Дисплей сообщений хранился в памяти долго после того, как он больше не виден. В идеале я бы хотел немного расширить, чтобы написать:

MySource.IncomingMessages.ToWeakObservable().Subscribe(m => Messages.Items.Add(m));

Я также не хочу полагаться на то, что MessageDisplay является пользовательским элементом управления, так как позже я захочу перейти на установку MVVM с MessageDisplayViewModel, который не будет управлять пользователем.

4b9b3361

Ответ 1

Код ниже вдохновлен исходным сообщением dtb. Единственное изменение заключается в том, что он возвращает ссылку на наблюдателя как часть IDisposable. Это означает, что ссылка на IObserver будет сохранена до тех пор, пока вы держите ссылку на IDisposable, который вы выходите в конце цепи (при условии, что все одноразовые сохраняют ссылку на одноразовое перед ними). Это позволяет использовать методы расширения, такие как Subscribe(M=>DoSomethingWithM(M)), потому что мы сохраняем ссылку на неявно сконструированный IObserver, но мы не сохраняем сильную ссылку от источника на IObserver (который создавал бы лук-память).

using System.Reactive.Linq;

static class WeakObservation
{
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> observable)
    {
        return Observable.Create<T>(observer =>
            (IDisposable)new DisposableReference(new WeakObserver<T>(observable, observer), observer)
            );
    }
}

class DisposableReference : IDisposable
{
    public DisposableReference(IDisposable InnerDisposable, object Reference)
    {
        this.InnerDisposable = InnerDisposable;
        this.Reference = Reference;
    }

    private IDisposable InnerDisposable;
    private object Reference;

    public void Dispose()
    {
        InnerDisposable.Dispose();
        Reference = null;
    }
}

class WeakObserver<T> : IObserver<T>, IDisposable
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;

    public WeakObserver(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }

    public void OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }

    public void OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }

    public void OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }

    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}

Ответ 2

Вы можете подписаться на наблюдателя-наблюдателя с наблюдаемым, который имеет слабую ссылку на фактического наблюдателя и распределяет подписку, когда фактический наблюдатель больше не жив:

static IDisposable WeakSubscribe<T>(
    this IObservable<T> observable, IObserver<T> observer)
{
    return new WeakSubscription<T>(observable, observer);
}

class WeakSubscription<T> : IDisposable, IObserver<T>
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;

    public WeakSubscription(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }

    void IObserver<T>.OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }

    void IObserver<T>.OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }

    void IObserver<T>.OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }

    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}

Ответ 3

Пробег через эту тему пару лет спустя... просто хотел указать на решение, указанное в блоге Samuel Jack, который добавляет расширение метод для IObservable под названием WeaklySubscribe. Он использует подход добавления прокладки между объектом и наблюдателем, который отслеживает цель с помощью WeakReference. Это похоже на решения, предлагаемые другими для проблемы сильных ссылок в подписках на события, например, в в этой статье или это решение Paul Stovell. Некоторое время, используя что-то, основанное на подходе Павла, мне нравится решение Samuel для слабых подписчиков IObservable.

Ответ 4

это моя реализация (закройте простой)

public class WeakObservable<T>: IObservable<T>
{
    private IObservable<T> _source;

    public WeakObservable(IObservable<T> source)
    {
        #region Validation

        if (source == null)
            throw new ArgumentNullException("source");

        #endregion Validation

        _source = source;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IObservable<T> source = _source;
        if(source == null)
            return Disposable.Empty;
        var weakObserver = new WaekObserver<T>(observer);
        IDisposable disp = source.Subscribe(weakObserver);
        return disp;
    }
}
    public class WaekObserver<T>: IObserver<T>
{
    private WeakReference<IObserver<T>> _target;

    public WaekObserver(IObserver<T> target)
    {
        #region Validation

        if (target == null)
            throw new ArgumentNullException("target");

        #endregion Validation

        _target = new WeakReference<IObserver<T>>(target);
    }

    private IObserver<T> Target
    {
        get
        {
            IObserver<T> target;
            if(_target.TryGetTarget(out target))
                return target;
            return null;
        }
    }

    #region IObserver<T> Members

    /// <summary>
    /// Notifies the observer that the provider has finished sending push-based notifications.
    /// </summary>
    public void OnCompleted()
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnCompleted();
    }

    /// <summary>
    /// Notifies the observer that the provider has experienced an error condition.
    /// </summary>
    /// <param name="error">An object that provides additional information about the error.</param>
    public void OnError(Exception error)
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnError(error);
    }

    /// <summary>
    /// Provides the observer with new data.
    /// </summary>
    /// <param name="value">The current notification information.</param>
    public void OnNext(T value)
    {
        IObserver<T> target = Target;
        if (target == null)
            return;

        target.OnNext(value);
    }

    #endregion IObserver<T> Members
}
    public static class RxExtensions
{
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> source)
    {
        return new WeakObservable<T>(source);
    }
}
        static void Main(string[] args)
    {
        Console.WriteLine("Start");
        var xs = Observable.Interval(TimeSpan.FromSeconds(1));
        Sbscribe(xs);

        Thread.Sleep(2020);
        Console.WriteLine("Collect");
        GC.Collect();
        GC.WaitForPendingFinalizers();
        GC.Collect();
        Console.WriteLine("Done");
        Console.ReadKey();
    }

    private static void Sbscribe<T>(IObservable<T> source)
    {
        source.ToWeakObservable().Subscribe(v => Console.WriteLine(v));
    }

Ответ 5

Ключ должен признать, что вам нужно пройти как в целевом, так и в двухпараметрическом действии. Однопараметрическое действие никогда не сделает этого, потому что либо вы используете слабую ссылку на свое действие (и действие получило GC'd), либо вы используете сильную ссылку на свое действие, которое, в свою очередь, имеет сильную ссылку на цель, поэтому цель не может получить GC'd. Помня об этом, следующие работы:

using System;

namespace Closures {
  public static class WeakReferenceExtensions {
    /// <summary> returns null if target is not available. Safe to call, even if the reference is null. </summary>
    public static TTarget TryGetTarget<TTarget>(this WeakReference<TTarget> reference) where TTarget : class {
      TTarget r = null;
      if (reference != null) {
        reference.TryGetTarget(out r);
      }
      return r;
    }
  }
  public static class ObservableExtensions {

    public static IDisposable WeakSubscribe<T, U>(this IObservable<U> source, T target, Action<T, U> action)
      where T : class {
      var weakRef = new WeakReference<T>(target);
      var r = source.Subscribe(u => {
        var t = weakRef.TryGetTarget();
        if (t != null) {
          action(t, u);
        }
      });
      return r;
    }
  }
}

Пример наблюдения:

using System;
using System.Reactive.Subjects;

namespace Closures {
  public class Observable {
    public IObservable<int> ObservableProperty => _subject;
    private Subject<int> _subject = new Subject<int>();
    private int n;
    public void Fire() {
      _subject.OnNext(n++);
    }
  }
}

Использование:

Class SomeClass {

 IDisposable disposable;

 public void SomeMethod(Observable observeMe) {
   disposable = observeMe.ObservableProperty.WeakSubscribe(this, (wo, n) => wo.Log(n));
 }

  public void Log(int n) {
    System.Diagnostics.Debug.WriteLine("log "+n);
  }
}