Подтвердить что ты не робот

RxJava: цепные наблюдаемые

Можно ли реализовать что-то вроде следующего цепочки с помощью RxJava:

loginObservable()
   .then( (someData) -> {
      // returns another Observable<T> with some long operation
      return fetchUserDataObservable(someData);

   }).then( (userData) -> {
      // it should be called when fetching user data completed (with userData of type T)
      cacheUserData(userData);

   }).then( (userData) -> {
      // it should be called after all previous operations completed
      displayUserData()

   }).doOnError( (error) -> {
      //do something
   })

Я нашел эту библиотеку очень интересной, но не могу понять, как цепочка запросов, где друг от друга зависит от предыдущих.

4b9b3361

Ответ 1

Конечно, RxJava поддерживает .map, который делает это. Из RxJava Wiki:

map

В принципе, это будет:

loginObservable()
   .switchMap( someData -> fetchUserDataObservable(someData) )
   .map( userData -> cacheUserData(userData) )
   .subscribe(new Subscriber<YourResult>() {
        @Override
        public void onCompleted() {
           // observable stream has ended - no more logins possible
        }
        @Override
        public void onError(Throwable e) {
            // do something
        }
        @Override
        public void onNext(YourType yourType) {
            displayUserData();
        }
    });

Ответ 2

Это верхний пост, когда наблюдаются цепочки Googling RxJava, поэтому я просто добавлю еще один распространенный случай, когда вы не захотите преобразовывать полученные данные, но добавьте в цепочку другое действие (например, настройку данных для базы данных). Используйте .flatmap(). Вот пример:

mDataManager
    .fetchQuotesFromApi(limit)
    .subscribeOn(mSchedulerProvider.io())
    .observeOn(mSchedulerProvider.ui())
    // OnErrorResumeNext and Observable.error() would propagate the error to
    // the next level. So, whatever error occurs here, would get passed to
    // onError() on the UI side.
    .onErrorResumeNext(Function { Observable.error<List<Quote>>(it) })
    .flatMap { t: List<Quote> ->
        // Chain observable as such
        mDataManager.setQuotesToDb(t).subscribe(
            {},
            { e { "setQuotesToDb() error occurred: ${it.localizedMessage}" } },
            { d { "Done server set" } }
        )
        Observable.just(t)
    }
    .subscribeBy(
        onNext = {},
        onError = { mvpView?.showError("No internet connection") },
        onComplete = { d { "onComplete(): done with fetching quotes from api" } }
    )

Это RxKotlin2, но идея аналогична RxJava & RxJava2:

Краткое объяснение:

  • мы пытаемся получить некоторые данные (цитаты в этом примере) из API с mDataManager.fetchQuotesFromApi()
  • Мы подписываем наблюдаемое, чтобы делать что-то в потоке .io() и показывать результаты в потоке .ui().
  • onErrorResumeNext() гарантирует, что любая ошибка, с которой мы столкнемся при получении данных, будет обнаружена в этом методе. Я хочу завершить всю цепочку, когда там есть ошибка, поэтому я возвращаю Observable.error()
  • .flatmap() является частью цепочки. Я хочу иметь возможность устанавливать любые данные, которые я получаю из API, в свою базу данных. Я не преобразую данные, полученные с помощью .map(), я просто делаю что-то еще с этими данными без их преобразования.
  • Я подписываюсь на последнюю цепочку наблюдаемых. Если при извлечении данных произошла ошибка (первая наблюдаемая), она будет обработана (в этом случае распространена на подписанный onError()) с помощью onErrorResumeNext()
  • Я очень хорошо понимаю, что подписываюсь на наблюдаемую БД (внутри flatmap()). Любая ошибка, возникающая из-за этой наблюдаемой, НЕ будет распространяться на последние методы subscribeBy(), поскольку она обрабатывается внутри метода subscribe() внутри цепочки .flatmap().

Код взят из этого проекта, который находится здесь: https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt

Ответ 3

попробуйте использовать scan()

Flowable.fromArray(array).scan(...).subscribe(...)