Подтвердить что ты не робот

Rx IObservable буферизация для сглаживания всплесков событий

У меня есть наблюдаемая последовательность, которая создает события в быстрых всплесках (т.е. пять событий один за другим, затем длинная задержка, затем еще один быстрый всплеск событий и т.д.). Я хочу сгладить эти всплески, вставив короткую задержку между событиями. Представьте себе следующую диаграмму:

Raw:      --oooo--------------ooooo-----oo----------------ooo|
Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|

Мой текущий подход состоит в том, чтобы генерировать метроноподобный таймер через Observable.Interval(), который сигнализирует, когда это нормально, чтобы вытащить другое событие из необработанного потока. Проблема в том, что я не могу понять, как скомбинировать этот таймер с моей необработанной необнаруживаемой наблюдаемой последовательностью.

IObservable.Zip() близок к выполнению того, что я хочу, но он работает только до тех пор, пока поток raw генерирует события быстрее, чем таймер. Как только в необработанном потоке происходит значительное затишье, таймер создает ряд нежелательных событий, которые затем сразу соединяются со следующим пакетом событий из необработанного потока.

В идеале я хочу использовать метод расширения IObservable со следующей сигнатурой функции, которая дает bevaior, который я изложил выше. Теперь, прибегите к моему спасению StackOverflow:)

public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)

PS. Я новичок в Rx, поэтому извиняюсь, если это простой вопрос...


1. Простой, но ошибочный подход

Здесь мое первоначальное наивное и упрощенное решение, имеющее немало проблем:

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    Queue<T> q = new Queue<T>();
    source.Subscribe(x => q.Enqueue(x));
    return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue());
}

Первая очевидная проблема заключается в том, что IDisposable, возвращаемый внутренней подпиской на исходный источник, теряется, и поэтому подписка не может быть прервана. Вызов Dispose на IDisposable, возвращаемый этим методом, убивает таймер, но не базовый необработанный фид событий, который теперь бесполезно заполняет очередь, и никто не оставил, чтобы вывести события из очереди.

Вторая проблема заключается в том, что для исключений исключений или уведомлений о прекращении потока нет возможности передавать через поток необработанных событий в буферный поток - они просто игнорируются при подписке на исходный источник.

И последнее, но не менее важное: теперь у меня есть код, который периодически просыпается, независимо от того, есть ли на самом деле какая-либо работа, которую я бы предпочел избежать в этом замечательном новом мире реакции.


2. Путь чрезмерно сложный appoach

Чтобы решить проблемы, возникшие в моем первоначальном упрощенном подходе, я написал более сложную функцию much, которая ведет себя так же, как IObservable.Delay() (я использовал .NET Reflector для чтения этого кода и использовал его как основы моей функции). К сожалению, многие логические схемы, такие как AnonymousObservable, не доступны для публики вне системного кода. Поэтому мне пришлось копировать и вставлять код lot. Это решение, похоже, работает, но, учитывая его сложность, я менее уверен, что его ошибка бесплатна.

Я просто не могу поверить, что нет возможности выполнить это, используя некоторую комбинацию стандартных Reactive extensions. Ненавижу ощущение, что я совершенно не изобретаю колесо, и образец, который я пытаюсь построить, кажется довольно стандартным.

4b9b3361

Ответ 1

На самом деле это дубликат Способ буферизованных событий равномерно, но я включу здесь резюме (оригинал выглядит довольно запутанным, потому что он выглядит на несколько альтернатив).

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Drain(x => 
        Observable.Empty<int>()
            .Delay(minDelay)
            .StartWith(x)
    );
}

Моя реализация Drain работает как SelectMany, за исключением того, что она ожидает завершения предыдущего вывода (вы можете думать о нем как ConactMany, тогда как SelectMany больше похожа на MergeMany). Встроенный Drain не работает таким образом, поэтому вам нужно включить реализацию ниже:

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(
        this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}