Подтвердить что ты не робот

ЗавершенныйFuture: Ожидание первого нормально возвращается?

У меня есть несколько CompletableFuture, и я хочу запустить их параллельно, ожидая первого, который возвращает обычно.

Я знаю, что могу использовать CompletableFuture.anyOf, чтобы ждать возвращения первого, но это вернет обычно или исключительно. Я хочу игнорировать исключения.

List<CompletableFuture<?>> futures = names.stream().map(
  (String name) ->
    CompletableFuture.supplyAsync(
      () ->
        // this calling may throw exceptions.
        new Task(name).run()
    )
).collect(Collectors.toList());
//FIXME Can not ignore exceptionally returned takes.
Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture<?>[]{}));
try {
    logger.info(any.get().toString());
} catch (Exception e) {
    e.printStackTrace();
}
4b9b3361

Ответ 1

Вы можете использовать следующий вспомогательный метод:

public static <T>
    CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {

    CompletableFuture<T> f=new CompletableFuture<>();
    Consumer<T> complete=f::complete;
    l.forEach(s -> s.thenAccept(complete));
    return f;
}

который вы можете использовать таким образом, чтобы продемонстрировать, что он будет игнорировать предыдущие исключения, но вернуть первое предоставленное значение:

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(
        () -> { throw new RuntimeException("failing immediately"); }
    ),
    CompletableFuture.supplyAsync(
        () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
          return "with 5s delay";
        }),
    CompletableFuture.supplyAsync(
        () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
          return "with 10s delay";
        })
);
CompletableFuture<String> c = anyOf(futures);
logger.info(c.join());

Одним из недостатков этого решения является то, что он никогда не завершится, если все фьючерсы будут завершены исключительно. Решение, которое будет обеспечивать первое значение, если есть успешное вычисление, но сбой исключительно, если нет успешных вычислений вообще, немного более важно:

public static <T>
    CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {

    CompletableFuture<T> f=new CompletableFuture<>();
    Consumer<T> complete=f::complete;
    CompletableFuture.allOf(
        l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new)
    ).exceptionally(ex -> { f.completeExceptionally(ex); return null; });
    return f;
}

Он использует тот факт, что исключительный обработчик allOf s вызывается только после того, как все фьючерсы завершены (в исключительных случаях или нет), и что будущее может быть выполнено только один раз (в противном случае отключение особых вещей, таких как obtrude…). Когда выполняется исключительно обработчик, любая попытка завершить будущее с результатом была выполнена, если таковой был, поэтому попытка его выполнить исключительно успешно, если не было предыдущего успешного завершения.

Его можно использовать точно так же, как и первое решение, и проявлять только другое поведение, если все вычисления не выполняются, например:

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(
        () -> { throw new RuntimeException("failing immediately"); }
    ),
    CompletableFuture.supplyAsync(
        // delayed to demonstrate that the solution will wait for all completions
        // to ensure it doesn't miss a possible successful computation
        () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
            throw new RuntimeException("failing later"); }
    )
);
CompletableFuture<String> c = anyOf(futures);
try { logger.info(c.join()); }
catch(CompletionException ex) { logger.severe(ex.toString()); }

В приведенном выше примере используется задержка, показывающая, что решение будет ждать всех завершений, когда успех не будет выполнен, тогда как этот пример на ideone продемонстрирует, как последующий успех будет превратить результат в успех. Обратите внимание, что из-за кэширования результатов Ideones вы можете не заметить задержку.

Обратите внимание, что в случае, если все фьючерсы терпят неудачу, нет никакой гарантии о том, какое из исключений будет сообщено. Поскольку он ожидает всех завершений в ошибочном случае, любой может сделать это до конечного результата.

Ответ 2

Учитывая, что:

  • Одной из основ философии Java является предотвращение или отвращение к плохой практике программирования.

    (В какой степени он был успешным в этом является предметом другой дискуссии, точка до сих пор стоит, что это, несомненно, является одной из основных задач на языке.)р >

  • Игнорирование исключений - очень плохая практика.

    Исключение всегда должно быть либо rethrown для слоя выше, либо обработано, или, по крайней мере, . В частности, исключение должно никогда не проглатываться молча.

  • Ошибки должны сообщаться как можно скорее.

    например, см. боли, которые проходит среда выполнения, чтобы обеспечить быстрые итераторы с отказом, которые вызывают исключение ConcurrentModificationException, если коллекция модифицируется во время итерации.

  • Игнорирование исключительно завершенной CompletableFuture означает, что a) вы не сообщаете об ошибке в кратчайшие возможные сроки, и b) вы, вероятно, планируете вообще не сообщать об этом.

  • Неспособность просто ждать первого не исключительного завершения и, вместо этого, быть обеспокоенным исключительными пополнениями, не налагает никакого существенного бремени, поскольку вы всегда можете удалить исключительно завершенный элемент из списка (в то время как в в то же время не забывайте сообщать о сбое, верно?) и повторите ожидание.

Поэтому я не удивлюсь, если искомая функция намеренно отсутствует в Java, и я был бы готов утверждать, что это по праву отсутствует.

(Извините, Сотириос, никакого канонического ответа.)

Ответ 3

Ну, это метод, который должен поддерживаться каркасом. Во-первых, я подумал CompletionStage.applyToEither делает что-то подобное, но оказывается, что это не так. Поэтому я придумал это решение:

public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
  final int count = stages.size();
  if (count <= 0) {
    throw new IllegalArgumentException("stages must not be empty");
  }
  final AtomicInteger settled = new AtomicInteger();
  final CompletableFuture<U> future = new CompletableFuture<U>();
  BiConsumer<U, Throwable> consumer = (val, exc) -> {
    if (exc == null) {
      future.complete(val);
    } else {
      if (settled.incrementAndGet() >= count) {
        // Complete with the last exception. You can aggregate all the exceptions if you wish.
        future.completeExceptionally(exc);
      }
    }
  };
  for (CompletionStage<U> item : stages) {
    item.whenComplete(consumer);
  }
  return future;
}

Чтобы увидеть это в действии, вот несколько способов использования:

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

public class Main {
  public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
    final int count = stages.size();
    if (count <= 0) {
      throw new IllegalArgumentException("stages must not be empty");
    }
    final AtomicInteger settled = new AtomicInteger();
    final CompletableFuture<U> future = new CompletableFuture<U>();
    BiConsumer<U, Throwable> consumer = (val, exc) -> {
      if (exc == null) {
        future.complete(val);
      } else {
        if (settled.incrementAndGet() >= count) {
          // Complete with the last exception. You can aggregate all the exceptions if you wish.
          future.completeExceptionally(exc);
        }
      }
    };
    for (CompletionStage<U> item : stages) {
      item.whenComplete(consumer);
    }
    return future;
  }

  private static final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();

  public static <U> CompletionStage<U> delayed(final U value, long delay) {
    CompletableFuture<U> future = new CompletableFuture<U>();
    worker.schedule(() -> {
      future.complete(value);
    }, delay, TimeUnit.MILLISECONDS);
    return future;
  }
  public static <U> CompletionStage<U> delayedExceptionally(final Throwable value, long delay) {
    CompletableFuture<U> future = new CompletableFuture<U>();
    worker.schedule(() -> {
      future.completeExceptionally(value);
    }, delay, TimeUnit.MILLISECONDS);
    return future;
  }

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    System.out.println("Started...");

    /*
    // Looks like applyToEither doesn't work as expected
    CompletableFuture<Integer> a = CompletableFuture.completedFuture(99);
    CompletableFuture<Integer> b = Main.<Integer>completedExceptionally(new Exception("Exc")).toCompletableFuture();
    System.out.println(b.applyToEither(a, x -> x).get()); // throws Exc
    */

    try {
      List<CompletionStage<Integer>> futures = new ArrayList<>();
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #1"), 100));
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #2"), 200));
      futures.add(delayed(1, 1000));
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #4"), 400));
      futures.add(delayed(2, 500));
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #5"), 600));
      Integer value = firstCompleted(futures).toCompletableFuture().get();
      System.out.println("Completed normally: " + value);
    } catch (Exception ex) {
      System.out.println("Completed exceptionally");
      ex.printStackTrace();
    }

    try {
      List<CompletionStage<Integer>> futures = new ArrayList<>();
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#1"), 400));
      futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#2"), 200));
      Integer value = firstCompleted(futures).toCompletableFuture().get();
      System.out.println("Completed normally: " + value);
    } catch (Exception ex) {
      System.out.println("Completed exceptionally");
      ex.printStackTrace();
    }

    System.out.println("End...");
  }

}

Ответ 4

Будет ли это работать? Возвращает поток всех завершенных фьючерсов и возвращает один из них.

futures.stream()
  .filter(f -> {
    try{
      f.get();
      return true;
    }catch(ExecutionException | InterruptedException e){
      return false;
    }
    })
  .findAny();