Подтвердить что ты не робот

Задача RxJava Async в Android

Я пытаюсь реализовать асинхронную задачу с помощью RxJava в Android. Я попробовал следующий код, и он не сработал. Он выполняется в потоке пользовательского интерфейса. Я использую следующую версию RxAndroid 0.24.0.

try {
            Observable.just(someMethodWhichThrowsException())
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(s -> onMergeComplete());
        } catch (IOException e) {
            e.printStackTrace();
        }

Однако для меня работает асинхронно.

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    someMethodWhichThrowsException();
                } catch (IOException e) {
                    e.printStackTrace();
                }

                subscriber.onCompleted();
            }
        });
        observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();

Я пытаюсь понять следующее:  1. В чем разница между ними?  2. Какова наилучшая практика при создании асинхронных задач?

Спасибо заранее.

4b9b3361

Ответ 1

  • В чем разница между ними?
Observable.just(someMethodWhichThrowsException())
    .subscribeOn(Schedulers.newThread())

Это эквивалентно следующему:

Object someResult = someMethodWhichThrowsException();
Observable.just(someResult)
    .subscribeOn(Schedulers.newThread())

Как вы видите, это сначала вызывает вызов синхронного метода, затем передает его на Observable.just, чтобы стать наблюдаемым.

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            ...
        }
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

Этот метод, однако, будет запускать код в блоке call при подписке. Вы сказали, что хотите подписаться на новый поток (subscribeOn(Schedulers.newThread())), поэтому подписка происходит в новом потоке, и код, который запускается при подписке (блок call), запускается и в этом потоке. Это похоже на вызов Observable.defer.

  1. Какова наилучшая практика при создании асинхронных задач?

Хорошо, это вам и вашему желаемому поведению. Иногда вы хотите, чтобы асинхронный код начинал работать немедленно (в этом случае вы можете захотеть кэшировать его с использованием одного из операторов для этой цели). Я бы определенно рассмотрел возможность использования библиотеки Async Utils.

В других случаях вы хотите, чтобы он запускался только при подписке (это поведение в примерах здесь) - например, если есть побочные эффекты, или если вам все равно, когда он работает, и просто хотите используйте встроенные модули, чтобы получить что-то от потока пользовательского интерфейса. Дэн Лью упоминает, что Observable.defer очень удобно использовать старый код и получать его от потока пользовательского интерфейса во время преобразования в Rx.

Ответ 2

Используйте Async.start() из библиотеки RxJava Async Utils. Это вызовет функцию, которую вы предоставляете в другом потоке.

Пример:

Observable<String> observable = Async.start(new Func0<String>() {
    @Override
    public String call() {
        try {
            return someMethodWhichThrowsException();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
});

Как вы заметили, проверенные исключения должны быть завернуты в RuntimeExceptions.

См. также https://github.com/ReactiveX/RxJava/wiki/Async-Operators#start