Подтвердить что ты не робот

Как использовать AsyncRestTemplate для одновременного выполнения нескольких вызовов?

Я не понимаю, как эффективно использовать AsyncRestTemplate для совершения внешних вызовов службы. Для кода ниже:

class Foo {

    public void doStuff() {
        Future<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity(
                url1, String.class);
        String response1 = future1.get();

        Future<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(
                url2, String.class);
        String response2 = future2.get();

        Future<ResponseEntity<String>> future3 = asyncRestTemplate.getForEntity(
                url3, String.class);
        String response3 = future3.get();
    }
}

В идеале я хочу выполнить все 3 вызова одновременно и обрабатывать результаты, как только они будут выполнены. Однако каждый внешний вызов службы не выбирается до тех пор, пока get() не будет вызван, а get() будет заблокирован. Так разве это не побеждает цель AsyncRestTemplate? Я мог бы также использовать RestTemplate.

Итак, я не понимаю, как я могу заставить их работать одновременно?

4b9b3361

Ответ 1

Просто не вызывайте блокировку get() перед отправкой всех ваших асинхронных вызовов:

class Foo {
  public void doStuff() {
    ListenableFuture<ResponseEntity<String>> future1 = asyncRestTemplate
        .getForEntity(url1, String.class);
    ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate
        .getForEntity(url2, String.class);
    ListenableFuture<ResponseEntity<String>> future3 = asyncRestTemplate
        .getForEntity(url3, String.class);

    String response1 = future1.get();
    String response2 = future2.get();
    String response3 = future3.get();
  }
}

Вы можете делать как отправка, так и get в циклах, но обратите внимание, что текущий сбор результатов неэффективен, поскольку он застрял в следующем незавершенном будущем.

Вы можете добавить все фьючерсы в коллекцию и протестировать ее, проверяя каждое будущее для неблокирования isDone(). Когда этот вызов возвращает true, вы можете вызвать get().

Таким образом, ваш массовый сбор результатов будет оптимизирован, а не ждет следующего медленного будущего результата в порядке вызова get() s.

Лучше все же вы можете зарегистрировать обратные вызовы (промежутки времени) в каждом ListenableFuture, возвращенные AccyncRestTemplate, и вам не нужно беспокоиться о циклическом анализе потенциальных результатов.

Ответ 2

Если вам не нужно использовать "AsyncRestTemplate", я бы предложил использовать RxJava. RxJava zip оператор - это то, что вы ищете. Проверьте код ниже:

private rx.Observable<String> externalCall(String url, int delayMilliseconds) {
    return rx.Observable.create(
            subscriber -> {
                try {
                    Thread.sleep(delayMilliseconds); //simulate long operation
                    subscriber.onNext("response(" + url + ") ");
                    subscriber.onCompleted();
                } catch (InterruptedException e) {
                    subscriber.onError(e);
                }
            }
    );
}

public void callServices() {
    rx.Observable<String> call1 = externalCall("url1", 1000).subscribeOn(Schedulers.newThread());
    rx.Observable<String> call2 = externalCall("url2", 4000).subscribeOn(Schedulers.newThread());
    rx.Observable<String> call3 = externalCall("url3", 5000).subscribeOn(Schedulers.newThread());
    rx.Observable.zip(call1, call2, call3, (resp1, resp2, resp3) -> resp1 + resp2 + resp3)
            .subscribeOn(Schedulers.newThread())
            .subscribe(response -> System.out.println("done with: " + response));
}

Все запросы к внешним службам будут выполняться в отдельных потоках, когда последний вызов будет завершен, функция преобразования (в примере простая конкатенация строк) будет применена, и результат (конкатенированная строка) будет заменен на "zip" наблюдаемый.

Ответ 3

Что я понимаю по вашему вопросу? У вас есть предопределенный асинхронный метод, и вы пытаетесь сделать это методом асинхронно, используя класс RestTemplate.

Я написал метод, который поможет вам вызывать ваш метод асинхронно.

 public void testMyAsynchronousMethod(String... args) throws Exception {
        // Start the clock
        long start = System.currentTimeMillis();

        // Kick of multiple, asynchronous lookups
        Future<String> future1 = asyncRestTemplate
        .getForEntity(url1, String.class);;
        Future<String> future2 = asyncRestTemplate
        .getForEntity(url2, String.class);
        Future<String> future3 = asyncRestTemplate
        .getForEntity(url3, String.class);

        // Wait until they are all done
        while (!(future1 .isDone() && future2.isDone() && future3.isDone())) {
            Thread.sleep(10); //10-millisecond pause between each check
        }

        // Print results, including elapsed time
        System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
        System.out.println(future1.get());
        System.out.println(future2.get());
        System.out.println(future3.get());
    }

Ответ 4

Возможно, вы захотите использовать класс CompletableFuture (javadoc).

  • Преобразуйте свои вызовы в CompletableFuture. Например.

    final CompletableFuture<ResponseEntity<String>> cf = CompletableFuture.supplyAsync(() -> {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    });
    
  • Следующий вызов CompletableFuture::allOf с тремя новыми созданными фьючерсами.

  • Вызвать метод join() на результат. После того, как окончательное окончательное решение будет разрешено, вы можете получить результаты из каждого отдельного завершенного будущего, которое вы создали на шаге 3.

Ответ 5

Я не думаю, что любой из предыдущих ответов действительно достигнет parallelism. Проблема с ответом @diginoise заключается в том, что он фактически не достигает parallelism. Как только мы назовем get, мы заблокированы. Учтите, что вызовы очень медленные, поэтому future1 занимает 3 секунды, а future2 2 секунды и future3 3 секунды. При вызове 3 get один за другим, мы в конечном итоге ожидаем 3 + 2 + 3 = 8 секунд. @Vikrant Kashyap также отвечает на while (!(future1 .isDone() && future2.isDone() && future3.isDone())). Кроме того, цикл while - довольно уродливый вид кода для 3 фьючерсов, что, если у вас есть больше? Ответ @lkz использует другую технологию, чем вы просили, и даже тогда я не уверен, что zip собирается выполнить эту работу. Из Observable Javadoc:

zip применяет эту функцию в строгой последовательности, поэтому первый элемент излучаемый новым Observable, будет результатом функции применяется к первому элементу, испускаемому каждым из источников Observables; второй элемент, излучаемый новым Наблюдаемым, будет результатом функция, применяемая ко второму элементу, испускаемому каждым из этих Наблюдаемые; и т.д.

Благодаря широкой популярности Spring, они очень стараются поддерживать обратную совместимость и при этом иногда компрометируют API. AsyncRestTemplate методы, возвращающие ListenableFuture, являются одним из таких случаев. Если они привязаны к Java 8+, вместо него можно использовать CompletableFuture. Зачем? Поскольку мы не будем иметь дело с пулами потоков напрямую, у нас нет хорошего способа узнать, когда все функции ListenableFutures завершены. CompletableFuture имеет метод allOf, который создает новый CompletableFuture, который завершается, когда все заданные CompletableFutures завершаются. Поскольку мы не имеем этого в ListenableFuture, нам придется импровизировать. Я не компилировал следующий код, но должно быть ясно, что я пытаюсь сделать. Я использую Java 8, потому что в конце 2016 года.

// Lombok FTW
@RequiredArgsConstructor
public final class CounterCallback implements ListenableFutureCallback<ResponseEntity<String>> {
  private final LongAdder adder;

  public void onFailure(Throwable ex) {
    adder.increment();
  }
  public void onSuccess(ResponseEntity<String> result) {
    adder.increment();
  }
}

ListenableFuture<ResponseEntity<String>> f1 = asyncRestTemplate
        .getForEntity(url1, String.class);
f1.addCallback(//);
// more futures

LongAdder adder = new LongAdder();
ListenableFutureCallback<ResponseEntity<String>> callback = new CounterCallback(adder);
Stream.of(f1, f2, f3)
  .forEach {f -> f.addCallback(callback)}

for (int counter = 1; adder.sum() < 3 && counter < 10; counter++) {
  Thread.sleep(1000);
}
// either all futures are done or we're done waiting
Map<Boolean, ResponseEntity<String>> futures = Stream.of(f1, f2, f3)
  .collect(Collectors.partitioningBy(Future::isDone));

Теперь у нас есть Map, для которого futures.get(Boolean.TRUE) предоставит нам все завершенные фьючерсы, а futures.get(Boolean.FALSE) предоставит нам те, которые этого не сделали. Мы хотим отменить те, которые не были завершены.

Этот код делает несколько вещей, которые важны при параллельном программировании:

  • Он не блокируется.
  • Он ограничивает операцию до некоторого максимально допустимого времени.
  • Он четко разделяет успешные и неудачные случаи.

Ответ 6

Я думаю, вы здесь недооцениваете кое-что. Когда вы вызываете метод getForEntity, запросы уже запускаются. Когда вызывается метод get() будущего объекта, вы просто ожидаете завершения запроса. Поэтому, чтобы уволить все эти три запроса в одном и том же субсектонике, вам просто нужно сделать:

// Each of the lines below will fire an http request when it executed
Future<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity(url1, String.class);
Future<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(url2, String.class);
Future<ResponseEntity<String>> future3 = asyncRestTemplate.getForEntity(url3, String.class);

После того как все эти коды будут запущены, все запросы уже запущены (скорее всего, в одном и том же подсетоде). Тогда вы можете делать все, что хотите, между тем. Как только вы вызываете какой-либо метод get(), вы ожидаете завершения каждого запроса. Если они уже завершены, он сразу же вернется.

// do whatever you want in the meantime
// get the response of the http call and wait if it not completed
String response1 = future1.get();
String response2 = future2.get();
String response3 = future3.get();