Подтвердить что ты не робот

Как обрабатывать ошибки при выполнении Flux.map()

Я пытаюсь выяснить, как обрабатывать ошибки при отображении элементов внутри потока.

Например, я разбираю строку CSV в одном из моих бизнес-POJO:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));

Некоторые из этих строк могут содержать ошибки, поэтому я получаю в журнале:

 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
 reactor.core.publisher.FluxLog:  onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
 reactor.core.publisher.FluxLog:  java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo

Я читал в API некоторые методы обработки ошибок, но большинство ссылалось на возвращение "значения ошибки" или использование резервного Flux, как этот:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);

Однако использование этого с помощью my myflux означает, что весь поток снова обрабатывается.

Итак, есть ли способ обрабатывать ошибки при обработке отдельных элементов (I.e игнорируя их/регистрировать их) и продолжать обрабатывать остальную часть потока?

ОБНОВЛЕНИЕ с обходным путем @akarnokd

public Flux<StockQuotation> getQuotes(List<String> tickers)
{
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
    // Get each set of quotes in a separate thread
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
    // Convert each list of raw quotes string in a new Flux<String>
    .flatMap(list -> Flux.fromIterable(list))
    // Convert the string to POJOs
    .flatMap(x -> {
            try {
                return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));    
            }
            catch (IllegalArgumentException ex){
                System.out.println("Error decoding stock quotation: " + x);
                return Flux.empty();
            }
    });

    return processingFlux;
}

Это работает как шарм, однако, как вы видите, код менее изящный, чем раньше. Разве API Flux не имеет никакого способа делать то, что делает этот код?

retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)
4b9b3361

Ответ 1

Вместо этого вам понадобится flatMap, которые позволяют вам возвращать пустую последовательность, если сбой обработки:

myflux.flatMap(v -> {
    try {
        return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));
    } catch (IllegalArgumentException ex) {
        return Flux.empty();
    }
});

Ответ 2

Если вы хотите использовать методы Reactor 3 для работы с исключениями, вы можете использовать Mono.fromCallable.

flatMap(x -> 
    Mono.fromCallable(() -> converter.convertHistoricalCSVToStockQuotation(x))
        .flux()
        .flatMap(Flux::fromIterable)
        .onErrorResume(Flux::empty)
)

К сожалению, Flux.fromCallable отсутствует, поэтому при условии, что вызываемый объект возвращает список, вы должны вручную преобразовать его в Flux.

Ответ 3

В текущей версии Reactor 3 было добавлено немало методов. Таким образом, мы могли бы сделать что-то вроде этого:

Flux.onErrorResume(error -> { 
        System.out.println("Error decoding stock quotation: " + e);
        return Flux.empty();
    });

Смотрите больше информации о том, как обрабатывать ошибки здесь

Ответ 4

Вы можете использовать onErrorContinue. Это позволяет восстанавливаться после ошибок, удаляя элемент неисправности и продолжая работу с последующими элементами.