Подтвердить что ты не робот

Throttle Rx.Observable без пропусков

Throttle метод пропускает значения из наблюдаемой последовательности, если другие следуют слишком быстро. Но мне нужен метод, чтобы просто задержать их. То есть мне нужно установить минимальную задержку между элементами, не пропуская никаких.

Практический пример: существует веб-служба, которая может принимать запросы не быстрее, чем один раз в секунду; есть пользователь, который может добавлять запросы, одиночные или в пакетах. Без Rx я создам список и таймер. Когда пользователи добавят запросы, я добавлю их в список. В событии таймера я проверю, что список пуст. Если это не так, я отправлю запрос и удалю соответствующий элемент. С замками и всем этим. Теперь, с Rx, я могу создать Subject, добавлять элементы, когда пользователи добавляют запросы. Но мне нужен способ убедиться, что веб-сервис не заливается, применяя задержки.

Я новичок в Rx, поэтому, возможно, мне не хватает чего-то очевидного.

4b9b3361

Ответ 1

Там довольно простой способ сделать то, что вы хотите, используя EventLoopScheduler.

Я начал с наблюдаемого, который будет произвольно производить значения один раз каждые 0 - 3 секунды.

var rnd = new Random();

var xs =
    Observable
        .Generate(
            0,
            x => x < 20,
            x => x + 1,
            x => x,
            x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0));

Теперь, чтобы сделать эти выходные значения немедленно, если последнее значение не было в течение секунды назад, я сделал это:

var ys =
    Observable.Create<int>(o =>
    {
        var els = new EventLoopScheduler();
        return xs
            .ObserveOn(els)
            .Do(x => els.Schedule(() => Thread.Sleep(1000)))
            .Subscribe(o);
    });

Это эффективно отслеживает источник на EventLoopScheduler, а затем помещает его в режим ожидания в течение 1 секунды после каждого OnNext, чтобы он мог начать только следующий OnNext после его просыпания.

Я тестировал, что он работал с этим кодом:

ys
    .Timestamp()
    .Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0)
    .Subscribe(x => Console.WriteLine(x));

Надеюсь, это поможет.

Ответ 2

Я хочу предложить подход с использованием Observable.Zip:

// Incoming requests
var requests = new[] {1, 2, 3, 4, 5}.ToObservable();

// defines the frequency of the incoming requests
// This is the way to emulate flood of incoming requests.
// Which, by the way, uses the same approach that will be used in the solution
var requestsTimer = Observable.Interval(TimeSpan.FromSeconds(0.1)); 
var incomingRequests = Observable.Zip(requests, requestsTimer, (number, time) => {return number;});
incomingRequests.Subscribe((number) =>
{
    Console.WriteLine($"Request received: {number}");
});

// This the minimum interval at which we want to process the incoming requests
var processingTimeInterval = Observable.Interval(TimeSpan.FromSeconds(1));

// Zipping incoming requests with the interval
var requestsToProcess = Observable.Zip(incomingRequests, processingTimeInterval, (data, time) => {return data;});

requestsToProcess.Subscribe((number) =>
{
    Console.WriteLine($"Request processed: {number}");
});

Ответ 3

Как насчет простого метода расширения:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}

Использование:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1));

Ответ 4

Как насчет использования наблюдаемого таймера из очереди блокировки? Код ниже не проверен, но должен дать вам представление о том, что я имею в виду...

//assuming somewhere there is 
BlockingCollection<MyWebServiceRequestData> workQueue = ...

Observable
  .Timer(new TimeSpan(0,0,1), new EventLoopScheduler())
  .Do(i => myWebService.Send(workQueue.Take()));

// Then just add items to the queue using workQueue.Add(...)

Ответ 5

Рассмотрим использование буфера.

http://msdn.microsoft.com/en-us/library/hh212130(v=vs.103).aspx

Как видно из названия, оно буферизует элементы в наблюдаемом потоке событий без "удаления" любого из них.

-G

Ответ 6

.Buffer(TimeSpan.FromSeconds(0.2)).Where(i => i.Any())
.Subscribe(buffer => 
{
     foreach(var item in buffer) Console.WriteLine(item)
});

Ответ 7

Я играл с этим и нашел .Zip(как упоминалось ранее), чтобы быть самым простым методом:

var stream = "ThisFastObservable".ToObservable();
var slowStream = 
    stream.Zip(
        Observable.Interval(TimeSpan.FromSeconds(1)), //Time delay 
        (x, y) => x); // We just care about the original stream value (x), not the interval ticks (y)

slowStream.TimeInterval().Subscribe(x => Console.WriteLine($"{x.Value} arrived after {x.Interval}"));

вывод:

T arrived after 00:00:01.0393840
h arrived after 00:00:00.9787150
i arrived after 00:00:01.0080400
s arrived after 00:00:00.9963000
F arrived after 00:00:01.0002530
a arrived after 00:00:01.0003770
s arrived after 00:00:00.9963710
t arrived after 00:00:01.0026450
O arrived after 00:00:00.9995360
b arrived after 00:00:01.0014620
s arrived after 00:00:00.9993100
e arrived after 00:00:00.9972710
r arrived after 00:00:01.0001240
v arrived after 00:00:01.0016600
a arrived after 00:00:00.9981140
b arrived after 00:00:01.0033980
l arrived after 00:00:00.9992570
e arrived after 00:00:01.0003520