Подтвердить что ты не робот

Использование параллельного потока для возврата самого быстрого поставленного значения

У меня есть набор поставщиков, которые все дают один и тот же результат, но с другой (и переменной) скоростью.

Я хочу элегантный способ начать работу с поставщиками одновременно, и как только один из них получит значение, верните его (отбрасывая другие результаты).

Я попытался использовать параллельные потоки и Stream.findAny() для этого, но он всегда блокируется до тех пор, пока не будут созданы все результаты.

Здесь unit test, демонстрирующий мою проблему:

import org.junit.Test;

import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.junit.Assert.*;

public class RaceTest {

    @Test
    public void testRace() {
        // Set up suppliers
        Set<Supplier<String>> suppliers = Collections.newSetFromMap(new ConcurrentHashMap<>());
        suppliers.add(() -> "fast"); // This supplier returns immediately
        suppliers.add(() -> {
            try {
                Thread.sleep(10_000);
                return "slow";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }); // This supplier takes 10 seconds to produce a value

        Stream<Supplier<String>> stream = suppliers.parallelStream();
        assertTrue(stream.isParallel()); // Stream can work in parallel
        long start = System.currentTimeMillis();
        Optional<String> winner = stream
                .map(Supplier::get)
                .findAny();
        long duration = System.currentTimeMillis() - start;
        assertTrue(winner.isPresent()); // Some value was produced
        assertEquals("fast", winner.get()); // The value is "fast"
        assertTrue(duration < 9_000); // The whole process took less than 9 seconds
    }
}

Результат теста заключается в том, что последнее утверждение не выполняется, так как весь тест занимает около 10 секунд.

Что я здесь делаю неправильно?

4b9b3361

Ответ 1

В этом случае вам лучше использовать Callable вместо Supplier (такую ​​же функциональную подпись) и использовать старый добрый API concurrency, который существует с Java 5:

Set<Callable<String>> suppliers=new HashSet<>();
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
        Thread.sleep(10_000);
        return "slow";
    }
);

ExecutorService es=Executors.newCachedThreadPool();
try {

    String result = es.invokeAny(suppliers);
    System.out.println(result);

} catch (InterruptedException|ExecutionException ex) {
    Logger.getLogger(MyClass.class.getName()).log(Level.SEVERE, null, ex);
}
es.shutdown();

Обратите внимание, как весь "запустить все и вернуть самый быстрый" становится единственным вызовом метода...

У него также есть бонус отмены/прерывания всех ожидающих операций, как только один результат будет доступен, поэтому медленная операция обычно не будет ждать здесь всего десяти секунд (ну, в большинстве случаев, поскольку время не является детерминированным).

Ответ 2

Используемый вами код является недетерминированным. Цитирование Javadoc findAny():

Поведение этой операции явно недетерминировано; он может свободно выбирать любой элемент в потоке.

Вы можете использовать CompletionService и передать все задачи на него. Затем CompletionService.take() вернет Future первой завершенной задачи.

long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(suppliers.size());
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
suppliers.forEach(s -> completionService.submit(() -> s.get()));
String winner = completionService.take().get();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds

Ответ 3

Stream API не подходит для таких задач, которые не гарантируют, когда задачи будут завершены. Лучшим решением было бы использовать CompletableFuture:

long start = System.currentTimeMillis();
String winner = CompletableFuture
        .anyOf(suppliers.stream().map(CompletableFuture::supplyAsync)
                .toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds

Обратите внимание, что он все равно не может запускать всех поставщиков параллельно, если общий FJP не имеет уровня parallelism. Чтобы исправить это, вы можете создать свой собственный пул, который имеет необходимый уровень parallelism:

long start = System.currentTimeMillis();
ForkJoinPool fjp = new ForkJoinPool(suppliers.size());
String winner = CompletableFuture
        .anyOf(suppliers.stream().map(s -> CompletableFuture.supplyAsync(s, fjp))
                .toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
fjp.shutdownNow();