Подтвердить что ты не робот

Присоединение потоков Rx

Я пытаюсь моделировать запрос Rx, который не является тривиальным (для меня):

  • В комнате есть мужчины и женщины.
  • Они входят и выходят из комнаты, а иногда в комнате они меняют свое местоположение.
  • Каждый мужчина может смотреть на одну (или ноль) женщину в определенное время.
  • Каждый человек обладает следующими свойствами:

    class Man
    {
      public const int LookingAtNobody = 0;
      public int Id { get; set; }
      public double Location { get; set; }
      public int LookingAt { get; set; }
    }
    
  • У каждой женщины есть следующие свойства:

    class Woman
    {
      public int Id { get; set; }
      public double Location { get; set; }
    }
    
  • Для представления Men имеем IObservable<IObservable<Man>>, а для представления Women имеем IObservable<IObservable<Woman>>.

Как бы вы использовали Rx для генерации векторов от мужчин к женщинам, на которых они смотрят: IObservable<IObservable<Tuple<double,double>>>?

Чтобы помочь, вот несколько модульных тестов для некоторых простых случаев:

public class Tests : ReactiveTest
{
    [Test]
    public void Puzzle1()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnCompleted<Man>(300));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle2()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnCompleted<Man>(400));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(350),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle3()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnNext(300, new Man { Id = 1, Location = 2.0, LookingAt = Man.LookingAtNobody }),
            OnCompleted<Man>(400));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle4()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnNext(300, new Man { Id = 1, Location = 3.0, LookingAt = 20 }),
            OnNext(400, new Man { Id = 1, Location = 4.0, LookingAt = 20 }),
            OnCompleted<Man>(500));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));
        var w2 = scheduler.CreateHotObservable(
            OnNext(155, new Woman { Id = 20, Location = 100.0 }),
            OnNext(255, new Woman { Id = 20, Location = 200.0 }),
            OnNext(355, new Woman { Id = 20, Location = 300.0 }),
            OnCompleted<Woman>(455));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1), OnNext(50, w2));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        var expectedVector2 = new[]
                       {
                           OnNext(300, Tuple.Create(3.0, 200.0)),
                           OnNext(355, Tuple.Create(3.0, 300.0)),
                           OnNext(400, Tuple.Create(4.0, 300.0)),
                           OnCompleted<Tuple<double,double>>(455),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
        ReactiveAssert.AreElementsEqual(expectedVector2, innerResults[1]);
    }

    private static IEnumerable<Recorded<Notification<IList<Recorded<Notification<Tuple<double, double>>>>>>> runQuery(TestScheduler scheduler, IObservable<IObservable<Woman>> women, IObservable<IObservable<Man>> men)
    {
        // assuming nested sequences are hot
        var vectors =
            from manDuration in men
            join womanDuration in women on manDuration equals womanDuration
            select from man in manDuration
                   join woman in womanDuration on manDuration equals womanDuration
                   where man.LookingAt == woman.Id
                   select Tuple.Create(man.Location, woman.Location);

        var query = vectors.Select(vectorDuration =>
        {
            var vectorResults = scheduler.CreateObserver<Tuple<double, double>>();
            vectorDuration.Subscribe(vectorResults);
            return vectorResults.Messages;
        });

        var results = scheduler.Start(() => query, 0, 0, 1000).Messages;
        return results;
    }
}

(примечание: этот вопрос был отправлен на форумы Rx: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/e73ae4e2-68c3-459a-a5b6-ea957b205abe)

4b9b3361

Ответ 1

Если я правильно вас понимаю, цель состоит в том, чтобы создать наблюдаемое "следовать наблюдаемым", где "следовать за наблюдаемым" начинается, когда мужчина начинает смотреть на женщину и заканчивается, когда мужчина перестает смотреть на женщину. "Следующее наблюдение" должно состоять из кортежей самых последних мест мужчины и женщины.

Идея здесь заключается в использовании CombineLatest, который будет принимать два наблюдаемых, и когда какое-либо из них произведет значение, комбинатор оценивается для двух последних значений наблюдаемых, что дает значение в комбинированном наблюдаемом, Однако CombineLatest завершается только тогда, когда оба наблюдаемых завершены. В этом случае мы хотим завершить наблюдаемое, когда любой из двух источников завершен. Для этого мы определяем следующий метод расширения (я не считаю, что такой метод уже существует, но может быть проще):

public static IObservable<TSource>
  UntilCompleted<TSource, TWhile>(this IObservable<TSource> source,
                                       IObservable<TWhile> lifetime)
{
  return Observable.Create<TSource>(observer =>
  {
    var subscription = source.Subscribe(observer);
    var limiter = lifetime.Subscribe(next => { }, () =>
    {
      subscription.Dispose();
      observer.OnCompleted();
    });
    return new CompositeDisposable(subscription, limiter);
  });
}

Этот метод похож на TakeUntil, но вместо того, чтобы принимать до тех пор, пока lifetime не выдает значение, оно выполняется до тех пор, пока lifetime не завершится. Мы также можем определить простой метод расширения, который принимает первую строку, которая удовлетворяет предикату:

public static IObservable<TSource>
  Streak<TSource>(this IObservable<TSource> source,
                       Func<TSource, bool> predicate)
{
  return source.SkipWhile(x => !predicate(x)).TakeWhile(predicate);
}

Теперь для окончательного запроса мы объединяем всех мужчин со всеми женщинами, используя CombineLatest, и завершаем это с раннего раннего использования с помощью UntilCompleted. Чтобы получить "следовать наблюдаемым", мы выбираем полосу, где мужчина смотрит на женщину. Затем мы просто сопоставляем это с кортежем местоположений.

var vectors =
  from manDuration in men
  from womanDuration in women
  select manDuration
  .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })
  .UntilCompleted(womanDuration)
  .UntilCompleted(manDuration)
  .Streak(pair => pair.Man.LookingAt == pair.Woman.Id)
  .Select(pair => Tuple.Create(pair.Man.Location, pair.Woman.Location));

Это проходит все ваши тесты, но не обрабатывает сценарий, когда мужчина смотрит на женщину 10 на некоторое время, затем на 20 на некоторое время, а затем в 10 на некоторое время снова; используется только первая полоса. Чтобы наблюдать все полосы, мы можем использовать следующий метод расширения, который возвращает наблюдаемые полосы:

public static IObservable<IObservable<TSource>>
  Streaks<TSource>(this IObservable<TSource> source,
                        Func<TSource, bool> predicate)
{
  return Observable.Create<IObservable<TSource>>(observer =>
  {
    ReplaySubject<TSource> subject = null;
    bool previous = false;
    return source.Subscribe(x =>
    {
      bool current = predicate(x);
      if (!previous && current)
      {
        subject = new ReplaySubject<TSource>();
        observer.OnNext(subject);
      }
      if (previous && !current) subject.OnCompleted();
      if (current) subject.OnNext(x);
      previous = current;
    }, () =>
    {
      if (subject != null) subject.OnCompleted();
      observer.OnCompleted();
    });
  });
}

При подписке только один раз на поток источника и с помощью ReplaySubject этот метод работает как для горячих, так и для холодных наблюдаемых. Теперь для окончательного запроса мы выбираем все полосы следующим образом:

var vectors =
  from manDuration in men
  from womanDuration in women
  from streak in manDuration
  .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })
  .UntilCompleted(womanDuration)
  .UntilCompleted(manDuration)
  .Streaks(pair => pair.Man.LookingAt == pair.Woman.Id)
  select streak.Select(pair =>
    Tuple.Create(pair.Man.Location, pair.Woman.Location));

Ответ 2

Я не уверен, что понимаю, почему вы моделируете поток мест как мужчин, так и женщин как IObservable<IObservable<T>>, а не только IObservable<T>, но это может сработать:

public static IObservable<Tuple<double, double>> GetLocationsObservable(IObservable<IObservable<Man>> menObservable, 
                                                                            IObservable<IObservable<Woman>> womenObservable)
{
    return Observable.CombineLatest(
        menObservable.Switch(),
        womenObservable.Switch(),
        (man, woman) => new {man, woman})
            .Where(manAndWoman => manAndWoman.man.LookingAt == manAndWoman.woman.Id)
            .Select(manAndWoman => Tuple.Create(manAndWoman.man.Location, manAndWoman.woman.Location));
}

Переключатели по существу "переключаются" на новые наблюдаемые при его нажатии, что выравнивает потоки. Где и выбрать довольно просто.

У меня есть подозрительное подозрение, что я не понимаю что-то о требованиях, но я решил, что отправлю свой ответ на всякий случай, если это поможет.