Подтвердить что ты не робот

Можно ли использовать API Java 8 Streams для асинхронной обработки?

Я играл с CompletionStage/CompletableFuture в Java 8, чтобы выполнить асинхронную обработку, которая работает достаточно хорошо. Однако иногда мне нужна сцена для выполнения асинхронной обработки итератора/потока элементов, и, похоже, не существует способа сделать это.

В частности, Stream.forEach() имеет семантику, которая после вызова всех элементов была обработана. Я хотел бы получить то же самое, но вместо этого с помощью CompletionStage, например:

CompletionStage<Void> done = stream.forEach(...);
done.thenRun(...);

Если поток поддерживается асинхронным потоковым результатом, это будет намного лучше, чем ждать его завершения в самом коде выше.

Возможно ли построить это с помощью существующего Java 8 API? Обходные?

4b9b3361

Ответ 1

Насколько я знаю, API потоков не поддерживает асинхронную обработку событий. Похоже, что вы хотите что-то вроде Reactive Extensions для .NET, и есть порт Java, называемый RxJava, созданный Netflix.

RxJava поддерживает многие из тех же операций высокого уровня, что и потоки Java 8 (такие как карта и фильтр), и является асинхронным.

Обновить. В настоящее время существует инициатива реактивных потоков, и похоже, что JDK 9 будет включать поддержка хотя бы части его, хотя класс Flow.

Ответ 2

Как указано в @KarolKrol, вы можете сделать это с потоком CompletableFuture.

Существует библиотека, которая построена поверх потоков JDK8, чтобы облегчить работу с потоками CompletableFuture под названием cyclops-react.

Чтобы создать свои потоки, вы можете использовать API-интерфейс с быстрым протезом cyclops-реагировать или вы можете использовать простой-реагировать Stage s.

Ответ 3

cyclops-react (я являюсь автором этой библиотеки), предоставляет StreamUtils для обработки потоков. Одной из функций, которые он предоставляет, является futureOperations, которая обеспечивает доступ к стандартным операциям терминала Stream (а затем и к некоторым) с помощью твиста - Stream выполняется асинхронно, и результат возвращается внутри CompletableFuture..e.g

 Stream<Integer> stream = Stream.of(1,2,3,4,5,6)
                                       .map(i->i+2);
 CompletableFuture<List<Integer>> asyncResult =  StreamUtils.futureOperations(stream,
                                             Executors.newFixedThreadPool(1))
                                       .collect(Collectors.toList());

Существует также класс convience ReactiveSeq, который обертывает Stream и предоставляет те же функциональные возможности, с неплохим белым API

 CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6)
                                       .map(i->i+2)
                                       .futureOperations(
                                             Executors.newFixedThreadPool(1))
                                       .collect(Collectors.toList());

Как указал Адам cyclops-react FutureStreams предназначены для обработки данных асинхронно (путем смешивания фьючерсов и потоков вместе) - особенно подходит для многопоточных операций, которые включают блокирование ввода-вывода (например, чтение файлов, создание вызовов в режиме ожидания, вызов отдыха и т.д.).

Ответ 4

Можно создать поток, сопоставить каждый элемент с CompletionStage и собрать результаты с помощью CompletionStage.thenCombine(), но полученный код не будет более читабельным, а затем простым способом для этого.

    CompletionStage<Collection<Result>> collectionStage =
        CompletableFuture.completedFuture(
            new LinkedList<>()
        );

    for (Request request : requests) {
        CompletionStage<Result> resultStage = performRequest(request);
        collectionStage = collectionStage.thenCombine(
            resultStage,
            (collection, result) -> {
                collection.add(result);
                return collection;
            }
        );
    }

    return collectionStage;

Этот пример может быть легко преобразован в функциональный для каждого, не теряя удобочитаемости. Но использование потока reduce или collect требует дополнительного не очень тонкого кода.

Обновление: CompletableFuture.allOf и CompletableFuture.join предоставляют другой, более читаемый способ преобразования коллекции будущих результатов в будущий сбор результатов.