Подтвердить что ты не робот

Реактивные расширения: обрабатывать события партиями + добавить задержку между каждой партией

У меня есть приложение, которое в некоторых точках вызывает 1000 событий почти одновременно. То, что я хотел бы сделать, состоит в том, чтобы выгрузить события в куски из 50 элементов и начать обрабатывать их каждые 10 секунд. Нет необходимости ждать завершения партии до начала новой пакетной обработки.

Например:

10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))

Любые идеи, как достичь этого? Я полагаю, что Reactive Extensions - это путь, но другие решения также приемлемы.

Я попытался начать здесь:

        var bufferedItems = eventAsObservable
            .Buffer(15)
            .Delay(TimeSpan.FromSeconds(5)

Но заметил, что задержка не срабатывала, как я надеялся, и вместо этого все партии запускались одновременно, хотя 5 секунд задерживались.

Я также тестировал Window-метод, но я не заметил никакой разницы в поведении. Я полагаю, что TimeSpan в окне фактически означает, что "принимать каждое событие, которое происходит в следующие 10 секунд:

        var bufferedItems = eventAsObservable
            .Window(TimeSpan.FromSeconds(10), 5)
            .SelectMany(x => x)
            .Subscribe(DoProcessing);

Я использую Rx-Main 2.0.20304-бета.

4b9b3361

Ответ 1

Если вы предпочитаете не спать в потоках, вы можете сделать это:

var tick = Observable.Interval(TimeSpan.FromSeconds(5));

eventAsObservable
.Buffer(50)
.Zip(tick, (res, _) => res)
.Subscribe(DoProcessing);

Ответ 2

Попробуйте следующее:

    var bufferedItems = eventAsObservable
        .Buffer(50)
        .Do(_ => { Thread.Sleep(10000); });