Подтвердить что ты не робот

Обратная связь для RxJava

Я ищу лучший способ добиться простой системы Observable fallback для пустых результатов при использовании RxJava. Идея состоит в том, что если локальный запрос для набора данных приводит к нулевым элементам, тогда вместо него должен быть сделан резервный запрос (может быть сетевой вызов или что-то еще). В настоящее время мой код состоит из следующего:

Observable.create(new Observable.OnSubscribe<Object>() {
  @Override
  public void call(Subscriber<? super Object> subscriber) {
     List<Object> results = queryLocalDatabase();
     if (results.isEmpty()) {
       subscriber.onError(new Throwable("Empty results"));
     } else {
       // Process results...
     }
  }
}).onErrorResumeNext(/* Fallback Observable goes here */);

Хотя это работает, не имеет смысла бросать исключение для пустого набора результатов. Я заметил, что есть доступные условные операторы, такие как isEmpty, но похоже, что он меня не туда, где я хочу быть. Например, используя isEmpty...

localObservable = Observable.create(new Observable.OnSubscribe<Object>() {
  @Override
  public void call(Subscriber<? super Object> subscriber) {
     List<Object> results = queryLocalDatabase();
     for (Object obj : results) {
       // Process object
       subscriber.onNext(object);           
     }
     subscriber.onCompleted();
  }
});

localObservable.isEmpty()
  .switchMap(new Func1<Boolean, Observable<? extends Object>>() {
    @Override
    public Observable<? extends Object> call(Boolean isEmpty) {
      if (isEmpty) {
        // Return fallback Observable.
        return fallbackObservable;
      }

      // Return original Observable, which would mean running the local query again... Not desired.
      return localObservable;
    }
  });

Это почти приводит меня к тому, где я хочу быть, за исключением того, что localObservable, похоже, будет запускаться дважды в том случае, если есть непустые элементы, которые являются прерываниями транзакций.

4b9b3361

Ответ 1

Используйте оператор switchIfEmpty.

Пример использования:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        // return no item
        //subscriber.onNext(...);
        System.out.println("Generating nothing :)");
        subscriber.onCompleted();
    }
}).switchIfEmpty(Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        System.out.println("Generating item");
        subscriber.onNext("item");
        subscriber.onCompleted();
    }
})).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
    }

    @Override
    public void onNext(String s) {
        System.out.println("onNext: " + s);
    }
});

Упрощенный ламами:

Observable.create(subscriber -> {
    // return no item
    //subscriber.onNext(...);
    System.out.println("Generating nothing :)");
    subscriber.onCompleted();
}).switchIfEmpty(Observable.create(subscriber -> {
    System.out.println("Generating item");
    subscriber.onNext("item");
    subscriber.onCompleted();
})).subscribe(
    s -> System.out.println("onNext: " + s),
    throwable -> System.out.println("onError"),
    () -> System.out.println("onCompleted")
);

Ответ 2

Я уверен, что есть более элегантное решение этой проблемы, но вы можете просто использовать flatMap в этом случае. Поскольку вы уже имеете дело с List<Object>, который возвращается из метода queryLocalDatabase(), вы можете сделать что-то вроде этого.

Observable.create(new Observable.OnSubscribe<List<String>>() {
    @Override
    public void call(Subscriber<? super List<String>> subscriber) {
        List<String> results = queryLocalDatabase();
        subscriber.onNext(results);
        subscriber.onCompleted();
    }

    private List<String> queryLocalDatabase() {
        return Arrays.asList();
    }

}).flatMap(new Func1<List<String>, Observable<String>>() {
    @Override
    public Observable<String> call(List<String> list) {
        if (list.isEmpty()) {
            return getFallbackObservable();
        } else {
            return Observable.from(list);
        }
    }

    private Observable<String> getFallbackObservable() {
        return Observable.from("3", "4");
    }

}).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
    }

    @Override
    public void onNext(String s) {
        System.out.println("onNext: " + s);
    }
});

Я заменил Object for String для этого кода.

Ответ 3

Я нашел более элегантное решение этой проблемы.

Observable<String> cachedObservable =
        Observable.from(Collections.<String>emptyList()) // Swap this line with the below one to test the different cases
        //Observable.from(Arrays.asList("1", "2"))
                .cache();

cachedObservable.isEmpty().switchMap(isEmpty -> {
    if (isEmpty)
        return Observable.from("3", "4");
    else
        return cachedObservable;
}).subscribe(System.out::println);

Ключевым претендентом здесь является cache, который воспроизводит исходные испускаемые значения.

Имейте в виду javadoc о cache.

Вы жертвуете возможностью отказаться от подписки на источник при использовании кэша Observer, поэтому будьте осторожны, чтобы не использовать этот Observer on Observables, который испускает бесконечное или очень большое количество элементов, которые будут использовать память.

Ответ 4

После некоторых исследований и изучения ответов других людей, я считаю, что использование Transformer - это наиболее надежное решение, например...

Observable.from(Collections.emptyList())
  .compose(new Observable.Transformer<List<Object>, List<Object>>() {
    @Override
    public Observable<List<Object> call(Observable<List<Object>> source) {
      boolean isEmpty = observable.isEmpty().toBlocking().first();
      if (isEmpty) {
        return backupObservable();
      } else {
        return source;
      }
    }
  });

Ответ 5

Другой вариант. В моем случае мне нужно сначала попробовать одну кэшированную запись, а затем лениво попробовать другую, если нет:

public void test() {
    String cacheRowKey = "row";
    Observable<String> = cacheLookup(cacheRowKey, "newVersion").switchIfEmpty(
        Observable.defer(() -> cacheLookup(cacheRowKey, "legacyVersion").switchIfEmpty(
        Observable.defer(() -> onMiss(cacheRowKey;
}

private Observable<String> cacheLookup(String key, String version) {
    // Delegate to the real cache - will return effectively
    // Observable.just(xxx) or Observable.empty()
    ...
}

private Observable<String> onMiss(String key) {
    // do the expensive lookup to then populate the cache
    ...
}
  • Сначала выполняется поиск в кеше для ключа и "новой" версии
  • При пропуске кеша повторите попытку кэша для "старой версии"
  • В пропущенных кешках для "новых" и "устаревших" версий выполните дорогостоящий поиск (предположительно, чтобы заполнить кеш).

Все из них лениво выполняются по требованию.

Ответ 6

Вы также можете связать свой Observable:

Observable<Integer> o1 = Observable.empty();
Observable<Integer> o2 = Observable.empty();
Observable<Integer> o3 = Observable.just(3);
Observable<Integer> o4 = Observable.just(4);
Observable<Integer> os = Observable.concat(o1, o2, o3, o4);
Integer v = os.toBlocking().first();  // returns 3