Подтвердить что ты не робот

Как объединить 3 или более CompletionStages?

Если есть 2 CompletionStages, я могу объединить их с методом thenCombine:

CompletionStage<A> aCompletionStage = getA();
CompletionStage<B> bCompletionStage = getB();
CompletionStage<Combined> combinedCompletionStage =
    aCompletionStage.thenCombine(bCompletionStage, (aData, bData) -> combine(aData, bData));

Если у меня есть 3 или более CompletionStages, я могу создать цепочку методов thenCombine, но я должен использовать временные объекты для передачи результатов. Например, вот решение, использующее Pair и Triple из пакета org.apache.commons.lang3.tuple:

CompletionStage<A> aCompletionStage = getA();
CompletionStage<B> bCompletionStage = getB();
CompletionStage<C> cCompletionStage = getC();
CompletionStage<D> dCompletionStage = getD();

CompletionStage<Combined> combinedDataCompletionStage =
        aCompletionStage.thenCombine(bCompletionStage, (Pair::of))
                .thenCombine(cCompletionStage, (ab, c) ->
                        Triple.of(ab.getLeft(), ab.getRight(), c))
                .thenCombine(dCompletionStage, (abc, d) ->
                        combine(abc.getLeft(), abc.getMiddle(), abc.getRight(), d));

Есть ли лучший способ комбинировать результаты с несколькими CompletionStages?

4b9b3361

Ответ 1

Единственный способ объединить несколько этапов, которые хорошо масштабируются с растущим числом этапов, заключается в использовании CompletableFuture. Если ваш CompletionStage arent CompletableFuture, вы все равно можете преобразовать их, используя .toCompletableFuture():

CompletableFuture<A> aCompletionStage = getA().toCompletableFuture();
CompletableFuture<B> bCompletionStage = getB().toCompletableFuture();
CompletableFuture<C> cCompletionStage = getC().toCompletableFuture();
CompletableFuture<D> dCompletionStage = getD().toCompletableFuture();

CompletionStage<Combined> combinedDataCompletionStage = CompletableFuture.allOf(
    aCompletionStage, bCompletionStage, cCompletionStage, dCompletionStage)
    .thenApply(ignoredVoid -> combine(
        aCompletionStage.join(), bCompletionStage.join(),
        cCompletionStage.join(), dCompletionStage.join()) );

Это содержит больше шаблонов, чем объединение двух этапов с помощью thenCombine, но шаблонная плита не растет при добавлении к ней дополнительных этапов.


Обратите внимание, что даже с вашим оригинальным подходом thenCombine вам не нужно a Triple, a Pair достаточно:

CompletionStage<Combined> combinedDataCompletionStage =
    aCompletionStage.thenCombine(bCompletionStage, (Pair::of)).thenCombine(
        cCompletionStage.thenCombine(dCompletionStage, Pair::of),
        (ab, cd) -> combine(ab.getLeft(), ab.getRight(), cd.getLeft(), cd.getRight()));

Тем не менее, он не масштабируется, если вы хотите объединить больше этапов.


Совместное решение (относительно сложности) может быть:

CompletionStage<Combined> combinedDataCompletionStage = aCompletionStage.thenCompose(
    a -> bCompletionStage.thenCompose(b -> cCompletionStage.thenCompose(
        c -> dCompletionStage.thenApply(d -> combine(a, b, c, d)))));

Это проще в своей структуре, но все еще не масштабируется с большим количеством этапов.

Ответ 2

Третий ответ Холгера может быть немного короче:

CompletionStage<Combined> combinedDataCompletionStage = aCompletionStage.thenCompose(
    a -> bCompletionStage.thenCompose(
        b -> cCompletionStage.thenCombine(dCompletionStage,
            (c, d) -> combine(a, b, c, d))));

Ответ 3

Я думаю, что вы должны использовать промежуточный объект, но один из ваших собственных вместо использования Pair и Tuple

public R method() {
    CompletableFuture<A> aFuture = getAFuture();
    CompletableFuture<B> bFuture = getBFuture();
    CompletableFuture<C> cFuture = getCFuture();
    CompletableFuture<D> dFuture = getDFuture();

    return CompletableFuture.completedFuture(new WellNamedResultHolder())
            .thenCombineAsync(aFuture, WellNamedResultHolder::withAResult)
            .thenCombineAsync(bFuture, WellNamedResultHolder::withBResult)
            .thenCombineAsync(cFuture, WellNamedResultHolder::withCResult)
            .thenCombineAsync(dFuture, WellNamedResultHolder::withDResult)
            .thenApplyAsync(this::combineAllTheResults);
}

private static class WellNamedResultHolder {
    private A aResult;
    private B bResult;
    private C cResult;
    private D dResult;

    // Getters

    public WellNamedResultHolder withAResult(final A aResult) {
        this.aResult = aResult;
        return this;
    }

    public WellNamedResultHolder withBResult(final B bResult) {
        this.bResult = bResult;
        return this;
    }

    public WellNamedResultHolder withCResult(final C cResult) {
        this.cResult = cResult;
        return this;
    }

    public WellNamedResultHolder withDResult(final D dResult) {
        this.dResult = dResult;
        return this;
    }
}

Фактическая форма держателя результата, очевидно, может измениться в соответствии с вашими потребностями, предоставляя вам большую гибкость. Вы также отвечаете за то, что происходит после завершения этого будущего. Хотя здесь больше шаблонов, вы получаете код, который более наглядно описывает происходящее (какой ломбок может привести в порядок).

Ответ 4

Вы спрашивали о "3 или более", если они есть в списке как CompletableFutures (см. Другие ответы), вы можете использовать этот удобный метод:

private static <T> CompletableFuture<List<T>> join(List<CompletableFuture<T>> executionPromises) {
    CompletableFuture<Void> joinedPromise = CompletableFuture.allOf(executionPromises.toArray(CompletableFuture[]::new));
    return joinedPromise.thenApply(voit -> executionPromises.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}

Он преобразует ваш "список фьючерсов" в "будущее для списка результатов".

Ответ 5

У меня была похожая проблема, но у меня было более 3 завершаемых фьючерсов, поэтому, основываясь на ответе Хольгера, я сделал небольшую универсальную утилиту.

public static <T, R> CompletableFuture<R> allOf(List<CompletableFuture<T>> args, Function<List<T>, R> combiner) {
    final Queue<CompletableFuture<T>> queue = new LinkedList<>();
    for (CompletableFuture<T> arg : args) {
        queue.add(arg);
    }
    return aggregator(queue, new ArrayList<>(), combiner);
}

private static <T, R> CompletableFuture<R> aggregator(Queue<CompletableFuture<T>> queue, List<T> arg,
        Function<List<T>, R> combiner) {
    if (queue.size() == 2)
        return queue.poll().thenCombine(queue.poll(), (c, d) -> {
            arg.add(c);
            arg.add(d);
            return combiner.apply(arg);
        });
    return queue.poll().thenCompose(data -> {
        arg.add(data);
        return aggregator(queue, arg, combiner);
    });
}