Подтвердить что ты не робот

Как обрабатывать исключения, брошенные наблюдателем в Next в RxJava?

Рассмотрим следующий пример:

Observable.range(1, 10).subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

Это выводит номера от 1 до 5, а затем печатает исключение.

Я хочу добиться того, чтобы наблюдатель оставался подписанным и продолжал работать после выброса исключения, т.е. печатал все числа от 1 до 10.

Я попытался использовать retry() и другие различные операторы обработки ошибок, но, как сказано в документации, их целью является обработка ошибок испускаемый наблюдаемым.

Самое простое решение - просто обернуть все тело onNext в блок try-catch, но это не похоже на хорошее решение для меня. В подобном вопросе Rx.NET предлагаемое решение состояло в том, чтобы создать метод расширения, который мог бы сделать обертку, создав прокси-наблюдаемое. Я попытался переделать его:

Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
        origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

Это ничего не меняет, потому что сам RxJava переносит подписчика на SafeSubscriber. Использование unsafeSubscribe, чтобы обойти это, похоже, не является хорошим решением.

Что я могу сделать для решения этой проблемы?

4b9b3361

Ответ 1

Это общий вопрос, который возникает при обучении Rx.

TL; DR

Ваше предложение разместить логику обработки исключений в подписчике предпочтительнее создания общей наблюдаемой обертки.

Объяснение

Помните, что Rx - это толкание событий подписчикам.

Из наблюдаемого интерфейса видно, что на самом деле ничто не может наблюдать ничего об этом, кроме того, сколько времени они занимали для обработки события, или информации, содержащейся в любых заброшенных исключениях.

Общая оболочка для обработки исключений подписчиков и переноса сообщений для этого подписчика - плохая идея.

Почему? Ну, наблюдаемый должен только знать, что абонент теперь находится в неизвестном состоянии отказа. Проводить отправку событий в этой ситуации неразумно - возможно, например, абонент находится в состоянии, при котором каждое событие с этой точки вперед будет генерировать исключение и займет некоторое время, чтобы сделать это.

Как только абонент выкинул исключение, для наблюдаемого есть только два жизнеспособных курса действий:

  • Повторите выброс исключения
  • Внедрить общую обработку для регистрации сбоя и прекратить отправку событий (любого вида) и очистить любые ресурсы из-за этого подписчика и продолжить с любыми оставшимися подписками.

Конкретная обработка исключений абонентов будет плохим выбором дизайна; это создало бы неуместную поведенческую связь между подписчиком и наблюдаемым. Поэтому, если вы хотите быть устойчивым к плохим подписчикам, два варианта выше - действительно разумный предел ответственности самого наблюдаемого.

Если вы хотите, чтобы ваш абонент был устойчивым и выполнял его, вам следует полностью привязать его к логике обработки исключений, предназначенной для обработки определенных исключений, которые вы знаете, как восстановить (и, возможно, для обработки исключений переходных процессов, регистрации, повторной логики, размыкание цепи и т.д.).

Только сам абонент будет иметь контекст, чтобы понять, подходит ли он для получения дальнейших событий перед сбоем.

Если ваша ситуация требует разработки многоразовой логики обработки ошибок, поставьте себя в сознание обертывания обработчиков событий наблюдателя, а не наблюдаемого, - и будьте осторожны, чтобы не слепо переносить события передачи перед сбоем. Release It!, пока не написано о Rx, является интересным классиком для разработки программного обеспечения, о котором можно сказать в этом последнем пункте. Если вы его не читали, я очень советую.