Подтвердить что ты не робот

Ожидание списка Будущего

У меня есть метод, который возвращает List фьючерсов

List<Future<O>> futures = getFutures();

Теперь я хочу подождать, пока все фьючерсы будут успешно завершены или какие-либо из задач, выход которых будет возвращен будущим, выдает исключение. Даже если одна задача выдает исключение, нет смысла ждать других фьючерсов.

Простым подходом было бы

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

Но проблема здесь заключается в том, что, например, 4-е будущее выдает исключение, тогда я буду ждать, когда первые 3 фьючерса будут доступны.

Как это решить? Будет ли обратная помощь защелками в любом случае? Я не могу использовать Future isDone, потому что в java-документе говорится

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
4b9b3361

Ответ 1

Вы можете использовать CompletionService для получения фьючерсов, как только они будут готовы, и если один из них выдает исключение, отмените обработку. Что-то вроде этого:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}

Я думаю, что вы можете улучшить, чтобы отменить все еще выполняющиеся задачи, если одна из них выдаст ошибку.

РЕДАКТИРОВАТЬ: я нашел более полный пример здесь: http://blog.teamlazerbeez.com/2009/04/29/java-completionservice/

Ответ 2

Если вы используете Java 8, то вы можете сделать это проще с CompletableFuture и CompletableFuture.allOf, который применяет обратный вызов только после того, как все предоставленные CompletableFutures будут выполнены.

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}

Ответ 3

Вы можете использовать ExecutorCompletionService. В документации даже есть пример для вашего конкретного случая использования:

Предположим, что вы хотели бы использовать первый ненулевой результат набора задач, игнорируя все исключения, которые встречаются, и отменяя все остальные задачи, когда первый готов:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}

Важно отметить, что ecs.take() получит первую завершенную задачу, а не только первую поданную. Таким образом, вы должны получить их в порядке завершения выполнения (или выброса исключения).

Ответ 4

Использовать CompletableFuture в Java 8

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());

Ответ 5

Если вы используете Java 8 и не хотите манипулировать CompletableFuture s, я написал инструмент для извлечения результатов для List<Future<T>> с помощью потоковой передачи. Ключ в том, что вам запрещено map(Future::get), как он бросает.

public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<java.util.stream.Collector.Characteristics> characteristics()
        {
            return java.util.Collections.emptySet();
        }
    }

Для этого требуется AggregateException, который работает как С#

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

Этот компонент действует точно так же, как С# Task.WaitAll. Я работаю над вариантом, который делает то же самое, что и CompletableFuture.allOf (эквивалентно Task.WhenAll)

Причина, по которой я это сделал, заключается в том, что я использую Spring ListenableFuture и не хочу переносить на CompletableFuture, несмотря на то, что это более стандартный способ

Ответ 6

Возможно, это помогло бы (ничто не заменило бы сырой поток, да!) Я предлагаю запустить каждого парня Future с выделенным потоком (они идут параллельно), а затем когда-либо одна из полученной ошибки, он просто сигнализирует менеджеру (Handler class).

class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
  thisThread=Thread.currentThread();
  List<Future<Object>> futures = getFutures();
  trds=new Thread[futures.size()];
  for (int i = 0; i < trds.length; i++) {
    RunTask rt=new RunTask(futures.get(i), this);
    trds[i]=new Thread(rt);
  }
  synchronized (this) {
    for(Thread tx:trds){
      tx.start();
    }  
  }
  for(Thread tx:trds){
    try {tx.join();
    } catch (InterruptedException e) {
      System.out.println("Job failed!");break;
    }
  }if(!failed){System.out.println("Job Done");}
}

private List<Future<Object>> getFutures() {
  return null;
}

public synchronized void cancelOther(){if(failed){return;}
  failed=true;
  for(Thread tx:trds){
    tx.stop();//Deprecated but works here like a boss
  }thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

Я должен сказать, что приведенный выше код будет ошибкой (не проверял), но я надеюсь, что смогу объяснить это решение. пожалуйста, попробуйте.

Ответ 7

CompletionService возьмет ваши Callables с помощью метода.submit(), и вы можете получить вычисляемые фьючерсы с помощью метода.take().

Нельзя забывать об окончании службы ExecutorService, вызывая метод.shutdown(). Также вы можете вызвать этот метод только тогда, когда вы сохранили ссылку на службу исполнителя, поэтому обязательно сохраните ее.

Пример кода. Для фиксированного количества рабочих элементов, которые будут обрабатываться параллельно:

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

CompletionService<YourCallableImplementor> completionService = 
new ExecutorCompletionService<YourCallableImplementor>(service);

ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();

for (String computeMe : elementsToCompute) {
    futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;

while(received < elementsToCompute.size()) {
 Future<YourCallableImplementor> resultFuture = completionService.take(); 
 YourCallableImplementor result = resultFuture.get();
 received ++;
}
//important: shutdown your ExecutorService
service.shutdown();

Пример кода. Для динамического количества рабочих элементов, которые будут обрабатываться параллельно:

public void runIt(){
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
    ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();

    //Initial workload is 8 threads
    for (int i = 0; i < 9; i++) {
        futures.add(completionService.submit(write.new CallableImplementor()));             
    }
    boolean finished = false;
    while (!finished) {
        try {
            Future<CallableImplementor> resultFuture;
            resultFuture = completionService.take();
            CallableImplementor result = resultFuture.get();
            finished = doSomethingWith(result.getResult());
            result.setResult(null);
            result = null;
            resultFuture = null;
            //After work package has been finished create new work package and add it to futures
            futures.add(completionService.submit(write.new CallableImplementor()));
        } catch (InterruptedException | ExecutionException e) {
            //handle interrupted and assert correct thread / work packet count              
        } 
    }

    //important: shutdown your ExecutorService
    service.shutdown();
}

public class CallableImplementor implements Callable{
    boolean result;

    @Override
    public CallableImplementor call() throws Exception {
        //business logic goes here
        return this;
    }

    public boolean getResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }
}

Ответ 8

 /**
     * execute suppliers as future tasks then wait / join for getting results
     * @param functors a supplier(s) to execute
     * @return a list of results
     */
    private List getResultsInFuture(Supplier<?>... functors) {
        CompletableFuture[] futures = stream(functors)
                .map(CompletableFuture::supplyAsync)
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[functors.length]);
        CompletableFuture.allOf(futures).join();
        return stream(futures).map(a-> {
            try {
                return a.get();
            } catch (InterruptedException | ExecutionException e) {
                //logger.error("an error occurred during runtime execution a function",e);
                return null;
            }
        }).collect(Collectors.toList());
    };

Ответ 9

У меня есть служебный класс, который содержит эти:

@FunctionalInterface
public interface CheckedSupplier<X> {
  X get() throws Throwable;
}

public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
    return () -> {
        try {
            return supplier.get();
        } catch (final Throwable checkedException) {
            throw new IllegalStateException(checkedException);
        }
    };
}

Как только вы это сделаете, используя статический импорт, вы можете просто подождать все фьючерсы, как это:

futures.stream().forEach(future -> uncheckedSupplier(future::get).get());

Вы также можете собрать все их результаты, как это:

List<MyResultType> results = futures.stream()
    .map(future -> uncheckedSupplier(future::get).get())
    .collect(Collectors.toList());

Просто пересматриваю мой старый пост и замечаю, что у тебя было еще одно горе:

Но проблема здесь в том, что если, например, 4-е будущее выдает исключение, тогда я буду без необходимости ждать, когда будут доступны первые 3 фьючерса.

В этом случае простое решение состоит в том, чтобы сделать это параллельно:

futures.stream().parallel()
 .forEach(future -> uncheckedSupplier(future::get).get());

Таким образом, первое исключение, хотя и не остановит будущее, сломает оператор forEach, как в последовательном примере, но, поскольку все ожидают параллельно, вам не придется ждать завершения первых 3.

Ответ 10

В случае, если вы хотите объединить список CompletableFutures, вы можете сделать это:

List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures

// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[futures.size()]));

// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();

Для более подробной информации о Future & CompletableFuture, полезные ссылки:
1. Будущее: https://www.baeldung.com/java-future
2. CompletableFuture: https://www.baeldung.com/java-completablefuture
3. CompletableFuture: https://www.callicoder.com/java-8-completablefuture-tutorial/