Подтвердить что ты не робот

Внедрение IObservable <T> с нуля

Reactive Extensions имеют множество вспомогательных методов для преобразования существующих событий и асинхронных операций в наблюдаемые, но как реализовать IObservable <T> с нуля?

IEnumerable имеет прекрасное ключевое слово yield, чтобы сделать его очень простым в реализации.

Каков правильный способ реализации IObservable <T> ?

Нужно ли беспокоиться о безопасности потоков?

Я знаю, что есть поддержка для вызова в конкретном контексте синхронизации, но это что-то я как IObservable <T> автору нужно беспокоиться или это как-то встроено?

обновление:

Здесь моя версия С# для решения Brian F #

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

изменить: Не бросать ObjectDisposedException, если Dispose вызывается дважды

4b9b3361

Ответ 1

Честно говоря, я не уверен, как "правильно" все это, но если вы чувствуете себя очень хорошо, опираясь на мой опыт. Это код F #, но, надеюсь, вы почувствуете вкус. Он позволяет вам "обновлять" исходный объект, который затем можно вызвать "Next/Completed/Error on", и он управляет подписками и пытается подтвердить, когда источник или клиенты плохо работают.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

Меня будут интересовать любые мысли о том, что здесь хорошо или плохо; У меня еще не было возможности взглянуть на все новые материалы Rx от devlabs еще...

Мой собственный опыт подсказывает, что:

  • Те, кто подписывается на наблюдаемые, никогда не должны бросать из подписки. Нет ничего разумного, которое можно наблюдать, когда абонент бросает. (Это похоже на события.) Скорее всего, исключение просто перейдет в верхний уровень обработчика catch-all или сбой приложения.
  • Источники, вероятно, должны быть "логически однопоточными". Я думаю, что может быть сложнее написать клиентов, которые могут реагировать на одновременные вызовы OnNext; даже если каждый индивидуальный вызов поступает из другого потока, полезно избегать одновременных вызовов.
  • Определенно полезно иметь класс base/helper, который обеспечивает выполнение некоторых "контрактов".

Мне очень любопытно, могут ли люди показать более конкретные советы в этом направлении.

Ответ 2

Официальная документация обесценивает пользователей, реализующих IObservable. Вместо этого пользователи должны использовать метод factory Observable.Create

По возможности, реализуйте новые операторы, создавая существующие операторы. В противном случае реализуйте пользовательские операторы, используя Observable.Create

Бывает, что Observable.Create - это тривиальная оболочка вокруг Reactive internal class AnonymousObservable:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

Я не знаю, почему они не сделали свою реализацию публичной, но эй, что угодно.

Ответ 3

Да, ключевое слово yield прекрасное; может быть, что-то похожее на IObservable (OfT)? [Eric Meijer Обсуждение PDC '09 он говорит "да, посмотрите это пространство" на декларативный выход для генерации наблюдаемых.]

Для чего-то близкого (вместо того, чтобы кататься самостоятельно), проверьте нижний из (еще не) 101 Rx Samples "wiki", где команда предлагает использовать класс Subject (T) как "бэкэнд" для реализации IObservable (OfT). Вот их пример:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}

Ответ 4

  • Взломайте рефлектор и посмотрите.

  • Посмотрите несколько видеороликов C9 - этот показывает, как вы можете "вывести" комбинатор"

  • Секрет состоит в том, чтобы создавать классы AnonymousObservable, AnonymousObserver и AnonymousDisposable (которые просто работают вокруг того, что вы не можете создавать экземпляры интерфейсов). Они содержат нулевую реализацию, поскольку вы передаете это с помощью Actions и Funcs.

Например:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

Я позволю вам разобраться в остальном... это очень хорошее упражнение в понимании.

Там хорошая нить, растущая здесь со связанными вопросами.

Ответ 5

просто одно замечание относительно этой реализации:

после того, как параллельные коллекции вводятся в .net fw 4, вероятно, лучше использовать ConcurrentDictioary вместо простого словаря.

он сохраняет блокировки обработки в коллекции.

ади.