Подтвердить что ты не робот

Использование RxJava и Okhttp

Я хочу запросить url, используя okhttp в другом потоке (например, поток IO) и получить Response в основном потоке Android, но я не знаю, как создать Observable.

4b9b3361

Ответ 1

Сначала добавьте RxAndroid к вашим зависимостям, а затем создайте свой Observable следующим образом:

 Subscription subscription =   Observable.create(new Observable.OnSubscribe<Response>() {
        OkHttpClient client = new OkHttpClient();
          @Override
          public void call(Subscriber<? super Response> subscriber) {
            try {
              Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
              if (response.isSuccessful()) {
                  if(!subscriber.isUnsubscribed()){
                     subscriber.onNext(response);
                  }
                  subscriber.onCompleted();
              } else if (!response.isSuccessful() && !subscriber.isUnsubscribed()) {
                  subscriber.onError(new Exception("error"));
                }
            } catch (IOException e) {
              if (!subscriber.isUnsubscribed()) {
                  subscriber.onError(e);
              }
            }
          }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Response>() {
              @Override
              public void onCompleted() {

              }

              @Override
              public void onError(Throwable e) {

              }

              @Override
              public void onNext(Response response) {

              }
            });

Он запросит ваш url в другом потоке (io thread) и заметит его в основном потоке андроида.

И, наконец, когда вы покидаете экран, используйте subsribtion.unsubscribe(), чтобы избежать утечки памяти.

Когда вы используете Observable.create, вы должны написать много кода шаблона, также вы должны обрабатывать подписку самостоятельно. Лучшей альтернативой является использование defer. Форма документа:

не создавать Observable до тех пор, пока наблюдатель не подпишет и не создаст новый наблюдаемый для каждого наблюдателя

Оператор Defer ждет, пока наблюдатель не присоединится к нему, а затем он генерирует Observable, как правило, с наблюдаемым factoryфункция. Он делает это заново для каждого абонента, поэтому, хотя каждый абонент может подумать, что он подписывается на тот же Наблюдаемый, на самом деле каждый абонент получает свою собственную индивидуальную последовательность.

Как упоминалось Марцин Козиньски, вам просто нужно сделать это:

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});

Ответ 2

Легче и безопаснее использовать Observable.defer() вместо Observable.create():

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});

Таким образом, вы можете отказаться от подписки и противодавления. Здесь отличный пост Дэна Лью о create() и defer().

Если вы хотите пойти по маршруту Observable.create(), тогда он должен больше походить на эту библиотеку с isUnsubscribed() вызовами, посыпанными повсюду. И я считаю, что это все еще не справляется с противодавлением.

Ответ 3

Я понимаю, что это сообщение немного устарело, но есть новый и более удобный способ сделать это сейчас

Observable.fromCallable {
        client.newCall(Request.Builder().url("your url").build()).execute()
    }

Дополнительная информация: https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/