Я пытаюсь преобразовать List<CompletableFuture<X>>
в CompletableFuture<List<T>>
. Это очень полезно, когда у вас много асинхронных задач, и вам нужно получить результаты всех из них.
Если какой-либо из них не работает, окончательное будущее не выполняется. Вот как я реализовал:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
Чтобы запустить его:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
Если какой-либо из них терпит неудачу, он терпит неудачу. Он дает результат, как ожидалось, даже если есть миллион фьючерсов. У меня есть проблема: скажите, есть ли более 5000 фьючерсов, и если какой-либо из них терпит неудачу, я получаю StackOverflowError
:
Исключение в потоке "pool-1-thread-2611" java.lang.StackOverflowError в java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) в java.util.concurrent.CompletableFuture $ThenCompose.run(CompletableFuture.java:1487) в java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) в java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) в java.util.concurrent.CompletableFuture $ThenCompose.run(CompletableFuture.java:1487)
Что я делаю неправильно?
Примечание. Вышеупомянутое возвращенное будущее выходит из строя, когда какое-либо будущее не выполняется. Принятый ответ также должен принять этот момент.