То, что я пытаюсь достичь, - это буферизовать входящие события из некоторого IObservable (они входят в пакеты) и выпускать их дальше, но один за другим, даже с интервалом. Вот так:
-oo-ooo-oo------------------oooo-oo-o-------------->
-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->
Так как я довольно новичок в Rx, я не уверен, что уже есть объект или оператор, который делает именно это. Может быть, это может быть сделано композицией?
обновление:
Благодаря Richard Szalay для указания оператора Drain, я нашел еще один пример Джеймсом Майлсом использования Дренажа. Вот как мне удалось заставить его работать в WPF-приложении:
.Drain(x => {
Process(x);
return Observable.Return(new Unit())
.Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
}).Subscribe();
Мне было весело, потому что упущение параметра планировщика приводит к сбою приложения в режиме отладки без какого-либо исключения (мне нужно узнать, как справляться с исключениями в Rx). Метод Process напрямую изменяет состояние пользовательского интерфейса, но, я думаю, довольно просто сделать из него IObservable (используя ISubject?).
обновление:
Тем временем я экспериментировал с ISubject, класс ниже делает то, что я хотел - он своевременно выпускает буферизованный Ts:
public class StepSubject<T> : ISubject<T>
{
IObserver<T> subscriber;
Queue<T> queue = new Queue<T>();
MutableDisposable cancel = new MutableDisposable();
TimeSpan interval;
IScheduler scheduler;
bool idle = true;
public StepSubject(TimeSpan interval, IScheduler scheduler)
{
this.interval = interval;
this.scheduler = scheduler;
}
void Step()
{
T next;
lock (queue)
{
idle = queue.Count == 0;
if (!idle)
next = queue.Dequeue();
}
if (!idle)
{
cancel.Disposable = scheduler.Schedule(Step, interval);
subscriber.OnNext(next);
}
}
public void OnNext(T value)
{
lock (queue)
queue.Enqueue(value);
if (idle)
cancel.Disposable = scheduler.Schedule(Step);
}
public IDisposable Subscribe(IObserver<T> observer)
{
subscriber = observer;
return cancel;
}
}
Эта наивная реализация удалена из OnCompleted и OnError для ясности, а также разрешена только одна подписка.