Подтвердить что ты не робот

Лучшая практика для обработки onError и непрерывной обработки

Я новичок в RxJava, но я интегрирую его в проект, над которым я работаю, чтобы помочь мне изучить его. Я столкнулся с вопросом о лучших практиках.

У меня вопрос о том, как обрабатывать onError от предотвращения остановки обработки Observable.

Вот настройка:

У меня есть список userIds для каждого из них, который я хотел бы сделать 2 или более сетевых запросов. Если какой-либо из сетевых запросов завершился с ошибкой для userid, тогда этот идентификатор пользователя не будет обновлен и может быть пропущен. Это не должно препятствовать обработке других пользователей. У меня есть решение, но оно связано с вложенными подписчиками (см. Второй блок кода). Одна из проблем, которые я вижу, заключается в том, что если каждый вызов не выполняется, нет возможности короткого замыкания и прекращения останова от попадания сетевого ресурса даже после обнаружения определенного порогового числа.

Есть ли лучший способ сделать это?

В традиционном коде:

List<String> results = new ArrayList<String>();
for (String userId : userIds) {
    try {
        String info = getInfo(userId);  // can throw an GetInfoException
        String otherInfo = getOtherInfo(userId);  // can throw an GetOtherInfoException
        results.add(info + ", " + otherInfo);
    } catch (GetInfoException e) {
        log.error(e);
    } catch (GetOtherInfoException e) {
        log.error(e);
    }
}

Проблема:

псевдокод:

userid -> network requests -> result 
1 -> a, b -> onNext(1[a ,b])
2 -> a, onError -> onError
3 -> a, b -> onNext(3[a, b])
4 -> a, b -> onNext(4[a, b])

Ниже приведен рабочий пример списка userIds и для каждого 2 запроса информации. Если вы запустите его, вы увидите, что он потерпит неудачу (см. Ниже исходный код).

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        });
        return observable;
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        Observable<String> t = userIdObservable.flatMap(new Func1<String, Observable<String>>() {

            public Observable<String> call(final String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ",t1, "3");
                return Observable.mergeDelayError(info1, info2);
            }
        });

        t.subscribe(new Action1<String>() {

            public void call(String t1) {
                System.out.println(t1);
            }
        }, new Action1<Throwable>() {

            public void call(Throwable t1) {
                t1.printStackTrace();
            }
        },
        new Action0(){

            public void call() {
                System.out.println("onComplete");
            }

        });
    }
}

Вывод:

1::: 1
2::: 1
2::: 2
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:32)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:266)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:210)
        at rx.operators.OperationMergeDelayError$2.onSubscribe(OperationMergeDelayError.java:77)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable.onSubscribe(OperationMergeDelayError.java:171)
        at rx.operators.OperationMergeDelayError$1.onSubscribe(OperationMergeDelayError.java:64)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationMap$MapObservable$1.onNext(OperationMap.java:105)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMap$MapObservable.onSubscribe(OperationMap.java:102)
        at rx.operators.OperationMap$2.onSubscribe(OperationMap.java:76)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)

Вложенное решение для подписки:

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        });
        return observable;
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        userIdObservable.subscribe(new Action1<String>() {

            public void call(String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ", t1, "3");
                Observable.merge(info1, info2).subscribe(new Action1<String>() {

                    public void call(String t1) {
                        System.out.println(t1);
                    }
                }, new Action1<Throwable>() {

                    public void call(Throwable t1) {
                        t1.printStackTrace();
                    }
                },
                        new Action0() {

                            public void call() {
                                System.out.println("onComplete");
                            }

                        });
            }
        });
    }
}

Вывод:

1::: 1
2::: 1
onComplete
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
        at rx.Observable$2.onNext(Observable.java:381)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:367)
        at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 3
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
        at rx.Observable$2.onNext(Observable.java:381)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:367)
        at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 4
2::: 4
onComplete
1::: 5
2::: 5
onComplete
1::: 6
2::: 6
onComplete

Как вы можете видеть, только отдельные пользовательские идентификаторы, которые не выполнили, остановили свою индивидуальную обработку, но остальные обработчики были обработаны.

Просто ищите совета, посмотрите, имеет ли это решение смысл, а если нет, то что лучше всего.

Спасибо, Alex

4b9b3361

Ответ 1

Лучшей практикой является использование mergeDelayError(), которые объединяют несколько Observables в один, позволяя без ошибок Observables продолжать до распространения ошибок.

mergeDelayError ведет себя так же, как merge. Исключением является то, что один из объединяемых Observables завершается с уведомлением onError. Если это происходит при слиянии, объединенный Observable немедленно выдает уведомление onError и завершается. С другой стороны, mergeDelayError не будет сообщать об ошибке до тех пор, пока он не предоставит какие-либо другие невоспроизводимые Observables, что он объединяет шанс закончить выдачу своих элементов, и он будет испускать их сами, и будет только завершаться с уведомление onError, когда все остальные объединенные Observables завершены.

Ответ 2

Поскольку вы хотите проигнорировать ошибку, вы можете попробовать onErrorResumeNext(Observable.<String>empty());. Например,

Observable<String> info1 = getInfo("1::: ", t1, "2").onErrorResumeNext(Observable.<String>empty());
Observable<String> info2 = getInfo("2::: ", t1, "3").onErrorResumeNext(Observable.<String>empty());
return Observable.merge(info1, info2);

Ответ 3

Как новичок Rx, я также искал простой ответ для обработки исключений отдельно и продолжал обработку следующего события, но не смог найти ответы на что спрашивал @Дэниел Сегато. Вот одно решение, в котором у вас нет контроля:

Вышеприведенные примеры предполагают, что у вас есть контроль над наблюдаемыми, т.е. один из способов - отложить ошибки до конца, используя mergeDelayError ИЛИ вернуть неизвестное пустое событие. Наблюдаемое для каждого события как Наблюдаемое отдельно с помощью слияния.

Если это ошибка источника, вы можете использовать лифт для создания другого наблюдаемого, который в основном обрабатывает значение текущего Наблюдаемого изящно. Класс SimpleErrorEmitter имитирует неограниченный поток, который может иногда выходить из строя.

Observable.create(new SimpleErrorEmitter())
        // transform errors to write to error stream
        .lift(new SuppressError<Integer>(System.err::println))
        .doOnNext(System.out::println)  // and everything else to console
        .subscribe();


class SimpleErrorEmitter implements OnSubscribe<Integer> {
@Override
public void call(Subscriber<? super Integer> subscriber) {
    subscriber.onNext(1);
    subscriber.onNext(2);

    subscriber.onError(new FooException());

    subscriber.onNext(3);
    subscriber.onNext(4);

    subscriber.onCompleted();
}

class SuppressError<T> implements Operator<T, T> {
final Action1<Throwable> onError;
public SuppressError(Action1<Throwable> onError) {
    this.onError = onError;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> t1) {
    return new Subscriber<T>(t1) {
        @Override
        public void onNext(T t) {
            t1.onNext(t);
        }
        @Override
        public void onError(Throwable e) { // handle errors using a separate function
            onError.call(e);
        }
        @Override
        public void onCompleted() {
            t1.onCompleted();
        }
    };
}

Если это ошибка обработки подписчика, которая может попробовать/поймать и продолжить изящно

    Observable<Integer> justInts = justStrs.map((str) -> {
        try {
            return Integer.parseInt(str);
        } catch (NumberFormatException e) {
            return null;
        }
    });

Я все еще пытаюсь найти простой способ просто повторить попытку или задержать попытку неудачного события и продолжить со следующего.

    Observable<String> justStrs = Observable
            .just("1", "2", "three", "4", "5")  // or an unbounded stream
            // both these retrying from beginning 
            // when you delay or retry, if they are of known exception type
            .retryWhen(ex -> ex.flatMap(eachex -> {
                // for example, if it is a socket or timeout type of exception, try delaying it or retrying it
                if (eachex instanceof RuntimeException) {
                    return Observable.timer(1L, TimeUnit.MICROSECONDS, Schedulers.immediate());
                }
                return Observable.error(eachex);
            }))
            // or simply retry 2 times
            .retry(2) // if it is the source problem, attempt retry
            .doOnError((ex) -> System.err.println("On Error:" + ex));

Ссылка: https://groups.google.com/forum/#!topic/rxjava/trm2n6S4FSc

Ответ 4

Глядя на Observable.flatMap source:

return merge(map(func));

Если вы хотите, чтобы все возможные пользователи были обработаны, вы можете продолжить модифицированную версию flatMap:

Observable.mergeDelayError(userIdObservable.map(userInfoFunc))

Далее, если вы скажете:

Если какой-либо из сетевых запросов завершился с ошибкой для userid, тогда этот userid не будут обновлены и могут быть пропущены

Тогда не используйте:

return Observable.mergeDelayError(info1, info2);

Потому что это вызовет запросы как info1, так и info2, даже если один из них терпит неудачу.

Скорее пойдите с:

return Observable.merge(info1, info2);

Когда info1 и info2 подписываются на один и тот же поток, они будут выполняться последовательно, поэтому, если сбой info1 невозможен, info2 никогда не будет запрашиваться. Поскольку info1 и info2 ограничены I/O, я предполагаю, что вы хотите запускать их параллельно:

getInfo("1::: ", t1, "2").subscribeOn(Schedulers.io());
getInfo("2::: ",t1, "3").subscribeOn(Schedulers.io());

Это должно значительно ускорить вашу обработку

Весь код:

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        return Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        })
        .subscribeOn(Schedulers.io());
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        Observable<String> t = Observable.mergeDelayError(userIdObservable.map(new Func1<String, Observable<String>>() {

            public Observable<String> call(final String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ",t1, "3");
                return Observable.merge(info1, info2);
            }
        }));
        //rest is the same
    }
}