Подтвердить что ты не робот

Таймаут со значением по умолчанию в Java 8 CompletedFuture

Предположим, что у меня есть асинхронное вычисление, например:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .thenAccept(foo -> doStuffWithFoo(foo));

Есть ли хороший способ предоставить значение по умолчанию для foo, если поставщик async истекает в соответствии с определенным таймаутом? В идеале такая функциональность также попытается отменить медленного поставщика. Например, существует ли стандартная библиотечная функциональность, похожая на следующий гипотетический код:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .acceptEither(
                CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
                foo -> doStuffWithFoo(foo));

Или, может быть, даже лучше:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
        .thenAccept(foo -> doStuffWithFoo(foo));

Я знаю о get(timeout, unit), но мне интересно, есть ли более стандартный стандартный способ применения тайм-аута в асинхронном и реактивном режиме, как это предлагается в коде выше.

EDIT: здесь решение, которое вдохновлено Java 8: Обязательно проверяет обработку исключений в лямбда-выражениях. Почему обязательный, а не факультативный?, но, к сожалению, он блокирует поток. Если мы полагаемся на createFoo(), чтобы асинхронно проверять тайм-аут и бросать свой собственный тайм-аут, он будет работать без блокировки потока, но возлагает больше бремя на создателя поставщика и будет по-прежнему иметь затраты на создание исключения (которое может быть дорогим без "быстрого броска" )

static <T> Supplier<T> wrapped(Callable<T> callable) {
    return () -> {
        try {
            return callable.call();
        } catch (RuntimeException e1) {
            throw e1;
        } catch (Throwable e2) {
            throw new RuntimeException(e2);
        }
    };
}
CompletableFuture
        .supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
        .exceptionally(e -> "default")
        .thenAcceptAsync(s -> doStuffWithFoo(foo));
4b9b3361

Ответ 1

CompletableFuture.supplyAsync - это просто вспомогательный метод, который создает для вас CompletableFuture и отправляет задачу в пул ForkJoin.

Вы можете создать свой собственный источникAsync с такими требованиями, как это:

private static final ScheduledExecutorService schedulerExecutor = 
                                 Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService = 
                                 Executors.newCachedThreadPool();


public static <T> CompletableFuture<T> supplyAsync(
        final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
        T defaultValue) {

    final CompletableFuture<T> cf = new CompletableFuture<T>();

    // as pointed out by Peti, the ForkJoinPool.commonPool() delivers a 
    // ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
    // Using Executors.newCachedThreadPool instead in the example
    // submit task
    Future<?> future = executorService.submit(() -> {
        try {
            cf.complete(supplier.get());
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
        }
    });

    //schedule watcher
    schedulerExecutor.schedule(() -> {
        if (!cf.isDone()) {
            cf.complete(defaultValue);
            future.cancel(true);
        }

    }, timeoutValue, timeUnit);

    return cf;
}

Создание CompletableFuture с помощью этого помощника так же просто, как использование статического метода в CompletableFuture:

    CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
            TimeUnit.SECONDS, "default");

Чтобы проверить это:

    a = supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e1) {
            // ignore
        }
        return "hi";
    }, 1, TimeUnit.SECONDS, "default");

Ответ 3

У DZone есть хорошая статья, как решить эту проблему: https://dzone.com/articles/asynchronous-timeouts

Я не уверен в авторских правах на код, поэтому я не могу его скопировать. Решение очень похоже на решение от Dane White, но оно использует пул потоков с одним потоком плюс schedule(), чтобы избежать потери потока только для ожидания таймаута.

Он также выдает TimeoutException вместо возврата по умолчанию.

Ответ 4

Думаю, вам всегда понадобится дополнительный мониторинг потока, когда его время будет предоставлено значение по умолчанию. Я, вероятно, поеду на маршрут с двумя вызовами supplyAsync, с по умолчанию включенными в API-интерфейс утилиты, связанными с acceptEither. Если вы предпочитаете обертывать своего Поставщика, тогда вы можете использовать API-интерфейс утилиты, который делает для вас "любой" вызов:

public class TimeoutDefault {
    public static <T> CompletableFuture<T> with(T t, int ms) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(ms);
            } catch (InterruptedException e) { }
            return t;
        });
    }

    public static <T> Supplier<T> with(Supplier<T> supplier, T t, int ms) {
        return () -> CompletableFuture.supplyAsync(supplier)
            .applyToEither(TimeoutDefault.with(t, ms), i -> i).join();
    }
}

CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(Example::createFoo)
        .acceptEither(
            TimeoutDefault.with("default", 1000),
            Example::doStuffWithFoo);

CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(TimeoutDefault.with(Example::createFoo, "default", 1000))
        .thenAccept(Example::doStuffWithFoo);

Ответ 5

Нет стандартного библиотечного метода для построения CompletableFuture, поставляемого со значением после таймаута. Тем не менее, очень просто сворачивать свои ресурсы с минимальными ресурсами:

private static final ScheduledExecutorService EXECUTOR
        = Executors.newSingleThreadScheduledExecutor();

public static <T> CompletableFuture<T> delayedValue(final T value,
                                                    final Duration delay) {
    final CompletableFuture<T> result = new CompletableFuture<>();
    EXECUTOR.schedule(() -> result.complete(value),
                      delay.toMillis(), TimeUnit.MILLISECONDS);
    return result;
}

Его можно использовать с методами < either <<22 > :

  • accceptEither, acceptEitherAsync
  • applyToEither, applyToEitherAsync
  • runAfterEither, runAfterEitherAsync

В одном приложении используется кешированное значение, если вызов удаленной службы превышает некоторый порог задержки:

interface RemoteServiceClient {
    CompletableFuture<Foo> getFoo();
}

final RemoteServiceClient client = /* ... */;
final Foo cachedFoo = /* ... */;
final Duration timeout = /* ... */;

client.getFoos()
    .exceptionally(ignoredException -> cachedFoo)
    .acceptEither(delayedValue(cachedFoo, timeout),
        foo -> /* do something with foo */)
    .join();

Если вызов удаленного клиента завершается исключительно (например, SocketTimeoutException), мы можем быстро выйти из строя и использовать кешированное значение немедленно.

CompletableFuture.anyOf(CompletableFuture<?>...) можно комбинировать с этим примитивом delayedValue, чтобы обернуть a CompletableFuture с помощью указанной семантики:

@SuppressWarnings("unchecked")
public static <T> CompletableFuture<T> withDefault(final CompletableFuture<T> cf,
                                                   final T defaultValue,
                                                   final Duration timeout) {
    return (CompletableFuture<T>) CompletableFuture.anyOf(
        cf.exceptionally(ignoredException -> defaultValue),
        delayedValue(defaultValue, timeout));
}

Это значительно упрощает приведенный выше пример вызова удаленной службы:

withDefault(client.getFoos(), cachedFoo, timeout)
    .thenAccept(foo -> /* do something with foo */)
    .join();

CompletableFuture более точно обозначаются promises, поскольку они отделяют создание Future от его завершения. Обязательно используйте выделенные пулы потоков для работы с большим процессором. Чтобы создать CompletableFuture для дорогостоящего вычисления, вы должны использовать перегрузку CompletableFuture#supplyAsync(Supplier, Executor), так как перегрузка #supplyAsync(Supplier) по умолчанию равна общему ForkJoinPool. Возвращенный CompletableFuture не смог отменить свою задачу, так как эта функциональность не отображается интерфейсом Executor. В более общем случае зависимые CompletableFuture не отменяют своих родителей, например. cf.thenApply(f).cancel(true) не отменяет cf. Я бы рекомендовал придерживаться Future, возвращенного ExecutorService, если вам нужна эта функциональность.