Подтвердить что ты не робот

RxJava: как составить несколько Observables с зависимостями и собрать все результаты в конце?

Я изучаю RxJava и, как мой первый эксперимент, пытается переписать код в первом методе run() в этом коде (цитируется в блог Netflix как проблема RxJava может помочь решить), чтобы улучшить его асинхронность с использованием RxJava, т.е. он не ждет результата первого Будущего (f1.get()), прежде чем продолжить к остальной части кода.

f3 зависит от f1. Я вижу, как с этим справиться, flatMap, похоже, делает трюк:

Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
        }
    });

Далее f4 и f5 зависят от f2. У меня есть это:

final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer i) {
            Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
            Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
            return Observable.merge(f4Observable, f5Observable);
        }
    });

Что начинает становиться странным (merge их, вероятно, не то, что я хочу...), но позволяет мне делать это в конце, не совсем то, что я хочу:

f3Observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println("Observed from f3: " + s);
        f4And5Observable.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.println("Observed from f4 and f5: " + i);
            }
        });
    }
});

Это дает мне:

Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100

который является всеми числами, но, к сожалению, я получаю результаты в отдельных вызовах, поэтому я не могу полностью заменить окончательный println в исходном коде:

System.out.println(f3.get() + " => " + (f4.get() * f5.get()));

Я не понимаю, как получить доступ к обеим возвращаемым значениям в одной строке. Я думаю, что там, вероятно, есть некоторые функциональные программы, которые мне не хватает. Как я могу это сделать? Спасибо.

4b9b3361

Ответ 1

Похоже, все, что вам действительно нужно, - это немного больше поддержки и перспективы использования RX. Я бы посоветовал вам больше ознакомиться с документацией, а также с мраморными диаграммами (я знаю, что они не всегда полезны). Я также предлагаю изучить функцию lift() и операторы.

  • Вся точка наблюдаемого заключается в объединении потока данных и манипуляции данными с одним объектом.
  • Точка вызовов map, flatMap и filter должна обрабатывать данные в потоке данных.
  • Точка слияния состоит в объединении потоков данных
  • Точка операторов должна позволять вам нарушать постоянный поток наблюдаемых и определять собственные операции над потоком данных. Например, я закодировал оператора скользящей средней. Это суммирует n double в наблюдении удвоений, чтобы вернуть поток скользящих средних. Код буквально выглядел следующим образом:

    Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

Вы почувствуете облегчение, что многие методы фильтрации, которые вы считаете само собой разумеющимися, имеют lift() под капотом.

Сказанное; все, что требуется для объединения нескольких зависимостей:

  • изменение всех входящих данных на стандартный тип данных с помощью map или flatMap
  • объединение стандартных типов данных в поток
  • с использованием настраиваемых операторов, если одному объекту нужно ждать по другому, или если вам нужно заказывать данные в потоке. Внимание: этот подход замедлит поток вниз.
  • используя список или подписку на сбор всех этих данных.

Ответ 2

Изменить: кто-то преобразовал следующий текст, который я добавил в качестве редактирования в вопросе, в ответ, который я ценю, и понимаю, может быть, правильно, что нужно делать, однако я не считайте это ответом, потому что это явно не правильный способ сделать это. Я бы никогда не использовал этот код и не советую кому-либо его скопировать. Другие/лучшие решения и комментарии приветствуются!


Я смог решить эту проблему следующим образом. Я не знал, что вы могли бы наблюдаться не один раз, я полагал, что результаты могут быть использованы только один раз. Поэтому я просто flatMap f2Observable дважды (извините, я переименовал некоторые вещи в коде с момента моего первоначального сообщения), затем zip на всех наблюдаемых, а затем подписаться на это. То, что Map в zip для агрегирования значений нежелательно из-за типа жонглирования. Другие/лучшие решения и комментарии приветствуются! полный код доступен для просмотра. Спасибо.

Future<Integer> f2 = executor.submit(new CallToRemoteServiceB());
Observable<Integer> f2Observable = Observable.from(f2);
Observable<Integer> f4Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(integer));
            return Observable.from(f4);
        }       
    });     

Observable<Integer> f5Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(integer));
            return Observable.from(f5);
        }       
    });     

Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
    @Override
    public Map<String, String> call(String s, Integer integer, Integer integer2) {
        Map<String, String> map = new HashMap<String, String>();
        map.put("f3", s);
        map.put("f4", String.valueOf(integer));
        map.put("f5", String.valueOf(integer2));
        return map;
    }       
}).subscribe(new Action1<Map<String, String>>() {
    @Override
    public void call(Map<String, String> map) {
        System.out.println(map.get("f3") + " => " + (Integer.valueOf(map.get("f4")) * Integer.valueOf(map.get("f5"))));
    }       
});     

И это дает мне желаемый результат:

responseB_responseA => 714000

Ответ 3

Я думаю, что вы ищете коммутатор. Мы столкнулись с аналогичной проблемой, когда у нас есть сеансовая служба, которая обрабатывает новый сеанс из api, и нам нужен этот сеанс, прежде чем мы сможем получить больше данных. Мы можем добавить к наблюдаемому сеансу, который возвращает sessionToken для использования в нашем вызове данных.

getSession возвращает наблюдаемый;

public getSession(): Observable<any>{
  if (this.sessionToken)
    return Observable.of(this.sessionToken);
  else if(this.sessionObservable)
    return this.sessionObservable;
  else {
    // simulate http call 
    this.sessionObservable = Observable.of(this.sessonTokenResponse)
    .map(res => {
      this.sessionObservable = null;
      return res.headers["X-Session-Token"];
    })
    .delay(500)
    .share();
    return this.sessionObservable;
  }
}

и getData принимает это наблюдаемое и добавляет к нему.

public getData() {
  if (this.dataObservable)
    return this.dataObservable;
  else {
    this.dataObservable = this.sessionService.getSession()
      .switchMap((sessionToken:string, index:number) =>{
        //simulate data http call that needed sessionToken
          return Observable.of(this.dataResponse)
          .map(res => {
            this.dataObservable = null;
            return res.body;
          })
          .delay(1200)
        })
        .map ( data => {
          return data;
        })
        .catch(err => {
          console.log("err in data service", err);
         // return err;
        })
        .share();
    return this.dataObservable;
  }
}

Вам понадобится плоская карта для объединения не зависимых наблюдаемых.

Plunkr: http://plnkr.co/edit/hiA1jP?p=info

Где я получил идею использовать карту переключателей: http://blog.thoughtram.io/angular/2016/01/06/taking-advantage-of-observables-in-angular2.html