Подтвердить что ты не робот

Список <Будущее> в будущее <Список> последовательность

Я пытаюсь преобразовать List<CompletableFuture<X>> в CompletableFuture<List<T>>. Это очень полезно, когда у вас много асинхронных задач, и вам нужно получить результаты всех из них.

Если какой-либо из них не работает, окончательное будущее не выполняется. Вот как я реализовал:

  public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
        if(com.isEmpty()){
            throw new IllegalArgumentException();
        }
        Stream<? extends CompletableFuture<T>> stream = com.stream();
        CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
        return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
            x.add(y);
            return x;
        },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
            ls1.addAll(ls2);
            return ls1;
        },exec));
    }

Чтобы запустить его:

ExecutorService executorService = Executors.newCachedThreadPool();
        Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return x;
        }, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

Если какой-либо из них терпит неудачу, он терпит неудачу. Он дает результат, как ожидалось, даже если есть миллион фьючерсов. У меня есть проблема: скажите, есть ли более 5000 фьючерсов, и если какой-либо из них терпит неудачу, я получаю StackOverflowError:

Исключение в потоке "pool-1-thread-2611" java.lang.StackOverflowError   в java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)   в java.util.concurrent.CompletableFuture $ThenCompose.run(CompletableFuture.java:1487)   в java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)   в java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)   в java.util.concurrent.CompletableFuture $ThenCompose.run(CompletableFuture.java:1487)

Что я делаю неправильно?

Примечание. Вышеупомянутое возвращенное будущее выходит из строя, когда какое-либо будущее не выполняется. Принятый ответ также должен принять этот момент.

4b9b3361

Ответ 1

Используйте CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

Несколько комментариев о вашей реализации:

Использование вами .thenComposeAsync, .thenApplyAsync и .thenCombineAsync, вероятно, не соответствует вашим ожиданиям. Эти методы ...Async запускают предоставленную им функцию в отдельном потоке. Итак, в вашем случае вы вызываете добавление нового элемента в список для запуска в предоставленном исполнителе. Нет необходимости помещать легковесные операции в исполнитель кэшированных потоков. Не используйте методы thenXXXXAsync без уважительной причины.

Кроме того, reduce не следует использовать для накопления в изменяемые контейнеры. Даже если он может работать правильно, когда поток является последовательным, он потерпит неудачу, если поток будет сделан параллельным. Чтобы выполнить изменчивое сокращение, используйте вместо этого .collect.

Если вы хотите завершить все вычисления исключительно сразу после первого сбоя, выполните следующее в вашем методе sequence:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

Если, кроме того, вы хотите отменить оставшиеся операции при первом сбое, добавьте exec.shutdownNow(); сразу после result.completeExceptionally(ex);. Это, конечно, предполагает, что exec существует только для этого одного вычисления. Если этого не произойдет, вам придется повторить цикл и отменить каждый оставшийся Future в отдельности.

Ответ 2

Как Миша указал, вы злоупотребляете операциями …Async. Кроме того, вы составляете сложную цепочку операций, моделируя зависимость, которая не отражает вашу программную логику:

  • вы создаете задание x, которое зависит от первого и второго задания вашего списка.
  • вы создаете задание x + 1, которое зависит от задания x и третьего задания вашего списка.
  • вы создаете задание x + 2, которое зависит от задания x + 1 и 4-го задания вашего списка.
  • ...
  • вы создаете задание x + 5000, которое зависит от задания x + 4999 и последнего задания вашего списка.

Затем, отменяя (явно или из-за исключения), это рекурсивно составленное задание может быть выполнено рекурсивно и может завершиться неудачей с помощью StackOverflowError. Это зависит от реализации.

Как уже показанный Мишей, существует метод allOf, который позволяет вам смоделировать свое первоначальное намерение, определить одно задание, которое зависит от всех заданий вашего списка.

Однако стоит отметить, что даже это необязательно. Поскольку вы используете неограниченный исполнитель пула потоков, вы можете просто отправить асинхронное задание, собирая результаты в список, и все готово. Ожидание завершения подразумевается путем запроса результата каждой работы в любом случае.

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
  .mapToObj(x -> CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
    return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
    () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
    executorService);

Использование методов для компоновки зависимых операций важно, когда количество потоков ограничено, и задания могут порождать дополнительные асинхронные задания, чтобы избежать того, чтобы ожидания выполняли кражу потоков из заданий, которые должны быть выполнены в первую очередь, но здесь и не здесь.

В этом конкретном случае одно задание, просто повторяющееся над этим большим количеством необходимых заданий и ожидающее при необходимости, может быть более эффективным, чем моделирование этого большого количества зависимостей, и каждое задание уведомляет зависимую работу о завершении.

Ответ 3

Вы можете получить библиотеку Spotify CompletableFutures и использовать метод allAsList. Я думаю, что это вдохновило метод Гуава Futures.allAsList.

public static <T> CompletableFuture<List<T>> allAsList(
    List<? extends CompletionStage<? extends T>> stages) {

А вот простая реализация, если вы не хотите использовать библиотеку:

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
        futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}

Ответ 4

Чтобы добавить к принятому ответу @Misha, он может быть дополнительно расширен как коллекционер:

 public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
    return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}

Теперь вы можете:

Stream<CompletableFuture<Integer>> stream = Stream.of(
    CompletableFuture.completedFuture(1),
    CompletableFuture.completedFuture(2),
    CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());

Ответ 5

Пример последовательности операций с использованием thenCombine on CompletableFuture

public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){

    CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());

    BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = 
            (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});

    BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;  

    return com.stream()
              .reduce(identity,
                      combineToList,
                      combineLists);  

   }
} 

Если вы не возражаете против использования сторонних библиотек cyclops-react (я являюсь автором), есть набор методов утилиты для CompletingFutures (и опции, потоки и т.д.)

  List<CompletableFuture<String>> listOfFutures;

  CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);

Ответ 6

Отказ от ответственности: Это не будет полностью ответить на первоначальный вопрос. Ему не хватит части "провалить все, если не получится". Однако я не могу ответить на реальный, более общий вопрос, потому что он был закрыт как дубликат этого: Java 8 CompletableFuture.allOf(...) с Collection или List. Поэтому я отвечу здесь:

Как конвертировать List<CompletableFuture<V>> в CompletableFuture<List<V>> использовать потоковый API Java 8?

Резюме: Используйте следующее:

private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
    CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

    BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
        futureValue.thenCombine(futureList, (value, list) -> {
                List<V> newList = new ArrayList<>(list.size() + 1);
                newList.addAll(list);
                newList.add(value);
                return newList;
            });

    BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
        List<V> newList = new ArrayList<>(list1.size() + list2.size());
        newList.addAll(list1);
        newList.addAll(list2);
        return newList;
    });

    return listOfFutures.stream().reduce(identity, accumulator, combiner);
}

Пример использования:

List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
    .mapToObj(i -> loadData(i, executor)).collect(toList());

CompletableFuture<List<String>> futureList = sequence(listOfFutures);

Полный пример:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;

public class ListOfFuturesToFutureOfList {

    public static void main(String[] args) {
        ListOfFuturesToFutureOfList test = new ListOfFuturesToFutureOfList();
        test.load(10);
    }

    public void load(int numThreads) {
        final ExecutorService executor = Executors.newFixedThreadPool(numThreads);

        List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
            .mapToObj(i -> loadData(i, executor)).collect(toList());

        CompletableFuture<List<String>> futureList = sequence(listOfFutures);

        System.out.println("Future complete before blocking? " + futureList.isDone());

        // this will block until all futures are completed
        List<String> data = futureList.join();
        System.out.println("Loaded data: " + data);

        System.out.println("Future complete after blocking? " + futureList.isDone());

        executor.shutdown();
    }

    public CompletableFuture<String> loadData(int dataPoint, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();

            System.out.println("Starting to load test data " + dataPoint);

            try {
                Thread.sleep(500 + rnd.nextInt(1500));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("Successfully loaded test data " + dataPoint);

            return "data " + dataPoint;
        }, executor);
    }

    private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
        CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

        BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
            futureValue.thenCombine(futureList, (value, list) -> {
                    List<V> newList = new ArrayList<>(list.size() + 1);
                    newList.addAll(list);
                    newList.add(value);
                    return newList;
                });

        BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
            List<V> newList = new ArrayList<>(list1.size() + list2.size());
            newList.addAll(list1);
            newList.addAll(list2);
            return newList;
        });

        return listOfFutures.stream().reduce(identity, accumulator, combiner);
    }

}

Ответ 7

В дополнение к библиотеке Spotify Futures вы можете попробовать найти мой код здесь: https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/CompletionStages.java (имеет зависимости от других классов в одном пакете)

Он реализует логику, чтобы вернуть "по крайней мере N из M" CompletionStage-s с политикой, сколько ошибок она допустила. Существуют удобные методы для всех/любых случаев, плюс политика отмены для оставшихся фьючерсов, плюс код имеет дело с CompletionStage-s (интерфейс), а не CompletableFuture (конкретный класс).