Подтвердить что ты не робот

Способ равномерного сдвига буферизованных событий

То, что я пытаюсь достичь, - это буферизовать входящие события из некоторого IObservable (они входят в пакеты) и выпускать их дальше, но один за другим, даже с интервалом. Вот так:

-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->

Так как я довольно новичок в Rx, я не уверен, что уже есть объект или оператор, который делает именно это. Может быть, это может быть сделано композицией?

обновление:

Благодаря Richard Szalay для указания оператора Drain, я нашел еще один пример Джеймсом Майлсом использования Дренажа. Вот как мне удалось заставить его работать в WPF-приложении:

    .Drain(x => {
        Process(x);
        return Observable.Return(new Unit())
            .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
    }).Subscribe();

Мне было весело, потому что упущение параметра планировщика приводит к сбою приложения в режиме отладки без какого-либо исключения (мне нужно узнать, как справляться с исключениями в Rx). Метод Process напрямую изменяет состояние пользовательского интерфейса, но, я думаю, довольно просто сделать из него IObservable (используя ISubject?).

обновление:

Тем временем я экспериментировал с ISubject, класс ниже делает то, что я хотел - он своевременно выпускает буферизованный Ts:

public class StepSubject<T> : ISubject<T>
{
    IObserver<T> subscriber;
    Queue<T> queue = new Queue<T>();
    MutableDisposable cancel = new MutableDisposable();
    TimeSpan interval;
    IScheduler scheduler;
    bool idle = true;

    public StepSubject(TimeSpan interval, IScheduler scheduler)
    {
        this.interval = interval;
        this.scheduler = scheduler;
    }

    void Step()
    {
        T next;
        lock (queue)
        {
            idle = queue.Count == 0;
            if (!idle)
                next = queue.Dequeue();
        }

        if (!idle)
        {
            cancel.Disposable = scheduler.Schedule(Step, interval);
            subscriber.OnNext(next);
        }
    }

    public void OnNext(T value)
    {
        lock (queue)
            queue.Enqueue(value);

        if (idle)
            cancel.Disposable = scheduler.Schedule(Step);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        subscriber = observer;
        return cancel;
    }
}

Эта наивная реализация удалена из OnCompleted и OnError для ясности, а также разрешена только одна подписка.

4b9b3361

Ответ 1

На самом деле это трюк, чем кажется.

Использование Delay не работает, потому что значения все равно будут выполняться в массе, только слегка задерживаются.

Использование Interval с помощью CombineLatest или Zip не работает, так как первое приведет к пропущению значений исходного значения, а последнее - к значениям интервала буферизации.

Я думаю, что новый оператор Drain (добавлен в 1.0.2787.0), в сочетании с Delay должен сделать трюк:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));

Оператор Drain работает как SelectMany, но ждет, пока предыдущий вывод не завершится до вызова селектора со следующим значением. Это еще не совсем то, что вам нужно (первое значение в блоке также будет отложено), но оно закрывается: Теперь использование теперь соответствует вашей мраморной диаграмме.

Изменить: По-видимому, Drain в фреймворке не работает как SelectMany. Я попрошу совета на официальных форумах. Тем временем, здесь реализация Drain, которая делает то, что вам нужно:

Редактировать 09/11: Исправлены ошибки в реализации и обновленном использовании в соответствии с запрошенной диаграммой мрамора.

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

Ответ 2

Только для полноты здесь есть альтернативная (более компактная) версия метода Drain(), предложенная Ричардом:

public static IObservable<T2> SelectManySequential<T1, T2>(
    this IObservable<T1> source, 
    Func<T1, IObservable<T2>> selector
)
{
    return source
        .Select(x => Observable.Defer<T2>(() => selector(x)))
        .Concat();
}

См. поток Drain + SelectMany =? в форуме Rx.

Update: Я понял, что перегрузка Concat(), которую я использовал, была одним из моих личных расширений Rx, которые (еще не) являются частью фреймворка. Я сожалею об этой ошибке. Конечно, это делает мое решение менее элегантным, чем я думал.

Тем не менее для полноты я размещаю здесь мой метод перегрузки расширения Conact():

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
    return Observable.CreateWithDisposable<T>(o =>
    {
        var lockCookie = new Object();
        bool completed = false;
        bool subscribed = false;
        var waiting = new Queue<IObservable<T>>();
        var pendingSubscription = new MutableDisposable();

        Action<Exception> errorHandler = e =>
        {
            o.OnError(e);
            pendingSubscription.Dispose();
        };

        Func<IObservable<T>, IDisposable> subscribe = null;
        subscribe = (ob) =>
        {
            subscribed = true;
            return ob.Subscribe(
                o.OnNext,
                errorHandler,
                () =>
                {
                    lock (lockCookie)
                    {
                        if (waiting.Count > 0)
                            pendingSubscription.Disposable = subscribe(waiting.Dequeue());
                        else if (completed)
                            o.OnCompleted();
                        else
                            subscribed = false;
                    }
                }
            );
        };

        return new CompositeDisposable(pendingSubscription,
            source.Subscribe(
                n =>
                {
                    lock (lockCookie)
                    {
                        if (!subscribed)
                            pendingSubscription.Disposable = subscribe(n);
                        else
                            waiting.Enqueue(n);
                    }

                },
                errorHandler
                , () =>
                {
                    lock (lockCookie)
                    {
                        completed = true;
                        if (!subscribed)
                            o.OnCompleted();
                    }
                }
            )
        );
    });
}

И теперь избивая себя собственным оружием: Тот же метод Concat() может быть написан гораздо более изящным в Ричарде Салае блестящим способом:

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Defer(() =>
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
        return source
            .Zip(queue, (v, q) => v)
            .SelectMany(v => 
                v.Do(_ => { }, () => queue.OnNext(new Unit()))
            );
    });
}

Так что кредит принадлежит Ричарду.: -)

Ответ 3

Вот как я это сделал, просто используя явную очередь (ReactiveCollection - просто причудливая версия WPF ObservableCollection - ReactiveCollection.ItemsAdded OnNext для каждого добавленного элемента, как вы можете себе представить):

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309

public static ReactiveCollection<T> CreateCollection<T>(this IObservable<T> FromObservable, TimeSpan? WithDelay = null)
{
    var ret = new ReactiveCollection<T>();
    if (WithDelay == null) {
        FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add);
        return ret;
    }

    // On a timer, dequeue items from queue if they are available
    var queue = new Queue<T>();
    var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value)
        .ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => {
            if (queue.Count > 0) { 
                ret.Add(queue.Dequeue());
            }
        });

    // When new items come in from the observable, stuff them in the queue.
    // Using the DeferredScheduler guarantees we'll always access the queue
    // from the same thread.
    FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue);

    // This is a bit clever - keep a running count of the items actually 
    // added and compare them to the final count of items provided by the
    // Observable. Combine the two values, and when they're equal, 
    // disconnect the timer
    ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1), 
        (l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose());

    return ret;
}