Подтвердить что ты не робот

Rxjava Android, как использовать Zip-оператор

У меня возникли проблемы с пониманием оператора zip в RxJava для моего проекта Android. проблема Мне нужно отправить сетевой запрос для загрузки видео Затем мне нужно отправить сетевой запрос, чтобы загрузить изображение, чтобы пойти с ним наконец, мне нужно добавить описание и использовать ответы от предыдущих двух запросов, чтобы загрузить URL-адреса местоположения видео и изображения вместе с описанием на мой сервер.

Я предположил, что zip-оператор был бы идеальным для этой задачи, поскольку я понял, что мы могли бы взять ответ двух наблюдаемых (запросы видео и изображений) и использовать их для моей последней задачи. Но я не могу понять, как это происходит.

Я ищу кого-то, чтобы ответить, как это можно сделать концептуально с помощью некоторого кода psuedo. Спасибо вам

4b9b3361

Ответ 1

Оператор Zip строго пар испускает элементы из наблюдаемых. Он ожидает, что оба (или более) элемента будут отправлены, а затем слейт их. Так что да, это будет подходящим для ваших нужд.

Я использовал бы Func2, чтобы связать результат с первыми двумя наблюдаемыми. Обратите внимание, что этот подход будет проще, если вы используете Retrofit, так как его интерфейс api может вернуть наблюдаемый. В противном случае вам нужно будет создать свой собственный наблюдаемый.

// assuming each observable returns response in the form of String
Observable<String> movOb = Observable.create(...);
// if you use Retrofit
Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...),
Observable.zip(movOb, picOb, 

   new Func2<String, String, MyResult>() {

      @Override
      public MyResult call(String movieUploadResponse, 
          String picUploadResponse) {
            // analyze both responses, upload them to another server
            // and return this method with a MyResult type
          return myResult;
         }
      }
)
// continue chaining this observable with subscriber
// or use it for something else

Ответ 2

Шаг за шагом: → Позволяет сначала определить наш объект Retrofit для доступа к API Githubs, а затем установить два наблюдаемых для двух указанных выше сетевых запросов:

Retrofit repo = new Retrofit.Builder()
        .baseUrl("https://api.github.com")
        .addConverterFactory(GsonConverterFactory.create())
        .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
        .build();

Observable<JsonObject> userObservable = repo
        .create(GitHubUser.class)
        .getUser(loginName)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread());

Observable<JsonArray> eventsObservable = repo
        .create(GitHubEvents.class)
        .listEvents(loginName)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread());

Интерфейсы Retrofit достаточно просты:

public interface GitHubUser {
  @GET("users/{user}")
  Observable<JsonObject> getUser(@Path("user") String user);
}

public interface GitHubEvents {
  @GET("users/{user}/events")
  Observable<JsonArray> listEvents(@Path("user") String user);
}

В последнее время мы используем метод RxJavas zip для объединения наших двух наблюдаемых и дождаемся их завершения до создания нового Observable.

Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() {
  @Override
  public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) {
    return new UserAndEvents(jsonObject, jsonElements);
  }
});

Что такое UserAndEvents? Простое POJO для объединения двух объектов:

public class UserAndEvents {
  public UserAndEvents(JsonObject user, JsonArray events) {
    this.events = events;
    this.user = user;
  }

  public JsonArray events;
  public JsonObject user;
}

Наконец, позвоните по методу подписки на наш новый объединенный Observable:

combined.subscribe(new Subscriber<UserAndEvents>() {
          ...
          @Override
          public void onNext(UserAndEvents o) {
            // You can access the results of the 
            // two observabes via the POJO now
          }
        });

Ответ 3

Маленький example:

Observable<String> stringObservable1 = Observable.just("Hello", "World");
Observable<String> stringObservable2 = Observable.just("Bye", "Friends");

Observable.zip(stringObservable1, stringObservable2, new BiFunction<String, String, String>() {
    @Override
    public String apply(@NonNull String s, @NonNull String s2) throws Exception {
        return s + " - " + s2;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});

Это напечатает:

Hello - Bye
World - Friends

Ответ 4

Здесь у меня есть пример, который я использовал с помощью Zip в асинхронном режиме, на случай, если вам любопытно

      /**
 * Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel.
 * By default Rx is not async, only if you explicitly use subscribeOn.
  */
@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
                                                                                        .concat(s3))
              .subscribe(result -> showResult("Async in:", start, result));
}

/**
 * In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline
 */
@Test
public void testZip() {
    long start = System.currentTimeMillis();
    Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
                                                                         .concat(s3))
              .subscribe(result -> showResult("Sync in:", start, result));
}


public void showResult(String transactionType, long start, String result) {
    System.out.println(result + " " +
                               transactionType + String.valueOf(System.currentTimeMillis() - start));
}

public Observable<String> obString() {
    return Observable.just("")
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

public Observable<String> obString1() {
    return Observable.just("")
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

public Observable<String> obString2() {
    return Observable.just("")
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "!");
}

public Observable<String> obAsyncString() {
    return Observable.just("")
                     .observeOn(scheduler)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

public Observable<String> obAsyncString1() {
    return Observable.just("")
                     .observeOn(scheduler1)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

public Observable<String> obAsyncString2() {
    return Observable.just("")
                     .observeOn(scheduler2)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "!");
}

Здесь вы можете увидеть больше примеров https://github.com/politrons/reactive

Ответ 5

Я искал простой ответ о том, как использовать Zip-оператор, и что делать с Observables, которые я создаю, чтобы передать их ему, мне было интересно, следует ли мне звонить subscribe() для каждого наблюдаемого или нет, не из этих ответов было просто найти, я должен был понять это сам, так что вот простой пример использования Zip-оператора на 2 Observables:

@Test
public void zipOperator() throws Exception {

    List<Integer> indexes = Arrays.asList(0, 1, 2, 3, 4);
    List<String> letters = Arrays.asList("a", "b", "c", "d", "e");

    Observable<Integer> indexesObservable = Observable.fromIterable(indexes);

    Observable<String> lettersObservable = Observable.fromIterable(letters);

    Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems())
            .subscribe(printMergedItems());
}

@NonNull
private BiFunction<Integer, String, String> mergeEmittedItems() {
    return new BiFunction<Integer, String, String>() {
        @Override
        public String apply(Integer index, String letter) throws Exception {
            return "[" + index + "] " + letter;
        }
    };
}

@NonNull
private Consumer<String> printMergedItems() {
    return new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    };
}

печатный результат:

[0] a
[1] b
[2] c
[3] d
[4] e

окончательные ответы на вопросы, что в моей голове были

Наблюдаемые, передаваемые методу zip(), просто должны быть созданы только, им не нужно иметь никаких подписчиков, но их создание достаточно... если вы хотите, чтобы какие-либо наблюдаемые выполнялись в планировщике, вы может указать это для этого Observable... Я также попробовал оператор zip() в Observables, где они должны ждать результата, а Расходная часть zip() была вызвана только тогда, когда оба результата, где они готовы (что является ожидаемым поведением)

Ответ 6

Оператор

zip позволяет вам составить результат из результатов двух разных наблюдаемых.

Вам нужно будет дать лямбду, которая создаст результат из данных, излучаемых каждым наблюдаемым.

Observable<MovieResponse> movies = ...
Observable<PictureResponse> picture = ...

Observable<Response> response = movies.zipWith(picture, (movie, pic) -> {
        return new Response("description", movie.getName(), pic.getUrl());

});