Подтвердить что ты не робот

Продвинутый исторический поток и прямой эфир в Rx

У меня есть горячее наблюдение, которое я обычно реализую с использованием нормального Subject снизу, так что заинтересованные могут подписаться на живой поток уведомлений.

Теперь я хотел бы сохранить этот живой поток, но также выставить исторический поток всех событий, которые были И имеют абсолютное время, прикрепленное к этим уведомлениям, чтобы знать, когда именно они произошли. А ТАКЖЕ позволяют подписчикам продвигать исторические поток в любой момент времени перед воспроизведением хронологии.

  • Я считаю, что в большинстве случаев это может быть достигнуто с помощью HistoricalScheduler и его метода AdvanceTo, но я точно не знаю, как это сделать
  • И используется Timestamped, чтобы сохранить время необходимых событий?
  • И есть ReplaySubject, необходимый для кэширования живого потока в исторических записях, которые затем могут быть воспроизведены с помощью функции HistoricalScheduler?

Как точно эти два потока могут быть реализованы для одного и того же источника, или, другими словами, как можно присвоить нижеследующие текущие требования?

how to save time in .net

[см. Заголовок "Воспроизведение прошлого"

4b9b3361

Ответ 1

То, что дает HistoricalScheduler, - это возможность управлять прямым движением виртуального времени планировщика.

То, что вы не получаете, - это случайный доступ с течением времени. По мере продвижения виртуального времени запланированные действия выполняются, поэтому они должны быть запланированы заранее. Любое действие, запланированное в прошлом, т.е. В абсолютном времени, которое находится за значением HistoricalScheduler.Now, выполняется немедленно.

Чтобы воспроизвести события, вам нужно как-то их записать, а затем запланировать их с помощью экземпляра HistoricalScheduler - и затем увеличить время.

Когда вы продвигаете время, запланированные действия выполняются в назначенное время - и когда наблюдаемые отправляют OnXXX() своим подписчикам, свойство Now планировщика будет иметь текущее виртуальное время.

Каждому абоненту нужен доступ к собственному планировщику, чтобы контролировать время независимо от других абонентов. Это фактически означает создание наблюдаемого для каждого абонента.

Вот быстрый пример, который я выбил (который будет работать в LINQPad, если вы указали пакет nuget rx-main).

Сначала я записываю поток в прямом эфире (совершенно непродуктивным способом!), записывая события в список. Как вы полагаете, использование TimeStamp() хорошо подходит для синхронизации времени:

/* record a live stream */
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var log = source.Take(5).Timestamp().ToList().Wait();


Console.WriteLine("Time now is " + DateTime.Now);

Теперь мы можем использовать HistoricalScheduler в сочетании с хитрым использованием Generate для планирования событий. Обратите внимание, что этот подход предотвращает приостановку тонны запланированных событий заблаговременно - вместо этого мы планируем только одно:

var scheduler = new HistoricalScheduler();

/* set up the scheduling of the recording events */
var replay = Observable.Generate(
    log.GetEnumerator(),
    events => events.MoveNext(),
    events => events,
    events => events.Current.Value,
    events => events.Current.Timestamp,
    scheduler);

Теперь, когда мы подписываемся, вы можете видеть, что свойство HistoricalScheduler Now имеет виртуальное время события:

replay.Subscribe(
    i => Console.WriteLine("Event: {0} happened at {1}", i,
    scheduler.Now)); 

Наконец, мы можем запустить расписание (с помощью Start() просто пытается воспроизвести все события, в отличие от использования AdvanceTo для перехода к определенному времени - ему нравится делать AdvanceTo(DateTime.MaxValue);

scheduler.Start();

Выход для меня был:

Time now is 07/01/2014 15:17:27
Event: 0 happened at 07/01/2014 15:17:23 +00:00
Event: 1 happened at 07/01/2014 15:17:24 +00:00
Event: 2 happened at 07/01/2014 15:17:25 +00:00
Event: 3 happened at 07/01/2014 15:17:26 +00:00
Event: 4 happened at 07/01/2014 15:17:27 +00:00

Результат заключается в том, что вам, вероятно, придется создать свой собственный API над этим инструментом, чтобы получить что-то, что соответствует вашим конкретным целям. Это оставляет вам немного работы - но тем не менее довольно мощный материал.

Какая приятность в том, что живой наблюдаемый и воспроизводимый наблюдаемый действительно не отличаются друг от друга - при условии, что вы помните, чтобы всегда параметризовать ваш планировщик (!) - и, таким образом, могут выполняться одинаковые запросы с их временными запросами работая с виртуальным временем планировщика.

Я использовал это, чтобы проверить новые запросы по старым данным, чтобы иметь большой эффект в коммерческих сценариях.

То, что он не пытается, - это управление транспортом, например, чтобы прокручивать назад и вперед во времени в графическом интерфейсе. Как правило, вы запускаете историю в больших кусках, сохраняете вывод новых запросов, а затем используете эти данные для последующего отображения в графическом интерфейсе, чтобы пользователи могли перемещаться вперед и назад в свободное время через какой-либо другой механизм, который вы предоставляете.

Наконец, вам не нужно ReplaySubject кэшировать прямой эфир; но вам понадобятся некоторые средства записи событий для воспроизведения - это может быть просто наблюдатель, который записывает в журнал.