Подтвердить что ты не робот

Объедините список Observables и дождитесь завершения всех завершенных

TL; DR Как преобразовать Task.whenAll(List<Task>) в RxJava?

В моем существующем коде используются болты для создания списка асинхронных задач и ожидания до завершения всех этих задач перед выполнением других действий. По существу, он создает List<Task> и возвращает один Task, который помечен как завершенный, когда все задачи в списке завершены, в соответствии с примером на Болты.

Я хочу заменить Bolts на RxJava, и я предполагаю, что этот метод создания списка задач async (размер неизвестен заранее) и завершение их всех в один Observable возможен, но я не знаю, как это сделать.

Я пробовал смотреть merge, zip, concat и т.д.... но не могу работать над List<Observable>, который я бы наращивал, поскольку все они, похоже, ориентированы на работу всего за два Observables за раз, если я правильно понимаю документы.

Я пытаюсь узнать RxJava и все еще очень новичок в этом, так что простите меня, если это очевидный вопрос или объясняется в документах где-то; Я попробовал поиск. Любая помощь будет высоко оценена.

4b9b3361

Ответ 1

Похоже, вы ищете оператор Zip.

Существует несколько способов его использования, поэтому рассмотрим пример. Скажем, у нас есть несколько простых наблюдаемых разных типов:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

Самый простой способ подождать их - это примерно так:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

Обратите внимание, что в zip-функции параметры имеют конкретные типы, которые соответствуют типам наблюдаемых, которые были заархивированы.

Возможно также перепечатывание списка наблюдаемых, либо непосредственно:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

... или путем переноса списка в Observable<Observable<?>>:

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

Однако в обоих случаях zip-функция может принимать только один параметр Object[], поскольку типы наблюдаемых в списке не известны заранее, а также их число. Это означает, что zip-функция должна будет проверять количество параметров и соответственно их применять.

Несмотря на это, все вышеприведенные примеры в конечном итоге напечатают 1 Blah true

РЕДАКТИРОВАТЬ: При использовании Zip убедитесь, что Observables, зашифрованный, испускает одинаковое количество элементов. В приведенных выше примерах все три наблюдаемых объекта испускали один элемент. Если бы мы изменили их на что-то вроде этого:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

Тогда 1, Blah, True и 2, Hello, True будут единственными элементами, переданными в zip-функцию (ы). Элемент 3 никогда не будет застегнут, поскольку остальные наблюдаемые завершены.

Ответ 2

Вы можете использовать flatMap, если у вас есть состав динамических задач. Что-то вроде этого:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplemete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

Еще один хороший пример параллельного выполнения

Примечание. Я не знаю ваших требований к обработке ошибок. Например, что делать, если только одна задача выходит из строя. Я думаю, вы должны проверить этот сценарий.

Ответ 3

Из предложенных предложений zip() фактически объединяет наблюдаемые результаты друг с другом, которые могут быть или не быть желаемыми, но не были заданы в этом вопросе. В этом вопросе все, что требовалось, - это выполнение каждой из операций, либо по одной, либо параллельно (что не было указано, но пример связанных болтов касался параллельного выполнения). Кроме того, zip() будет завершен немедленно после завершения любой из наблюдаемых, что нарушает требования.

Для параллельного выполнения Observables, flatMap() , представленный в другом ответе, подойдет, но merge() было бы проще. Обратите внимание, что слияние завершится при ошибке любого из наблюдаемых объектов. Если вы предпочитаете отложить выход до завершения всех наблюдаемых объектов, вам следует обратить внимание на mergeDelayError().

Один за другим, я думаю, что Observable.concat() статический метод должен быть использован. Его Javadoc состояния, такие как это:

concat (java.lang.Iterable> sequence) Разлагает итерируемое из наблюдаемых в одно наблюдаемое, одно за другим, не чередуя их

это звучит как то, что вы ищете, если вы не хотите параллельного выполнения.

Кроме того, если вас интересует только выполнение задачи, а не возвращаемые значения, вам, вероятно, следует обратиться к Completable вместо Observable.

TLDR: для выполнения задач по одному и события oncompletion, когда они завершены, я думаю, что Completable.concat() лучше всего подходит. Для параллельного выполнения Completable.merge() или Completable.mergeDelayError() звучит как решение. Первый из них немедленно остановится на любой ошибке в любом завершаемом файле, второй выполнит их все, даже если у одного из них будет ошибка, и только после этого сообщит об ошибке.

Ответ 4

С Котлином

Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->

})

Важно установить тип для аргументов функции, иначе у вас будут ошибки компиляции

Последнее изменение типа аргумента с номером аргумента: БиФункция для 2 Функция 3 для 3 Function4 для 4 ...

Ответ 5

Вероятно, вы посмотрели на оператор zip, который работает с 2 Observables.

Существует также статический метод Observable.zip. Он имеет одну форму, которая должна быть полезной для вас:

zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

Вы можете проверить javadoc для более.

Ответ 6

Я пишу код вычисления в Kotlin с JavaRx Observables и RxKotlin. Я хочу наблюдать список наблюдаемых, которые нужно заполнить, и тем самым дать мне обновление с прогрессом и последним результатом. В конце он возвращает лучший результат вычисления. Дополнительным требованием было одновременное использование Observables для использования всех моих процессорных ядер. Я закончил с этим решением:

@Volatile var results: MutableList<CalculationResult> = mutableListOf()

fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {

    return Observable.create { subscriber ->
        Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
            doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
        }).subscribeBy(
            onNext = {
                results.add(it)
                subscriber.onNext(Pair("A calculation is ready", it))

            },
            onComplete = {
                subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) 
                subscriber.onComplete()
            },
            onError = {
                subscriber.onError(it)
            }
        )
    }
}