Используя Reactive Extensions, я хочу игнорировать сообщения, исходящие из потока событий, которые происходят, когда мой метод Subscribe
запущен. То есть иногда мне требуется больше времени для обработки сообщения, чем время между сообщением, поэтому я хочу отказаться от сообщений, которые у меня нет времени для обработки.
Однако, когда мой метод Subscribe
завершается, если какие-либо сообщения поступают, я хочу обработать последний. Поэтому я всегда обрабатываю последнее сообщение.
Итак, если у меня есть код, который делает:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
и если мы предположим, что "100" требует много времени для обработки. Затем я хочу, чтобы "2" обрабатывался, когда "100" завершается. "1" следует игнорировать, поскольку он был заменен "2", а "100" все еще обрабатывался.
Вот пример результата, который я хочу использовать в фоновом задании, и Latest()
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
Однако, последний() является блокирующим вызовом, и я бы предпочел не иметь нить, ожидающую следующего значения, подобного этому (иногда могут возникать очень большие промежутки между сообщениями).
Я также могу получить результат, который я хочу, используя BroadcastBlock
из TPL Dataflow следующим образом:
var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
но похоже, что это должно быть возможно непосредственно в Rx. Какой лучший способ сделать это?