Подтвердить что ты не робот

Vert.x Подождите ответа на несколько сообщений

В vert.x я могу отправить сообщение на другую строку и "ждать асинхронно" для ответа.

Проблема заключается в следующем: я хочу отправить сообщения на несколько вершин и сделать асинхронный обработчик, который будет вызываться, когда все вершины отвечают.

Возможно ли это или есть лучший дизайн для достижения этой функциональности?

EDIT:

Предположим, что у меня есть вершина A, которая отправляет сообщения на вершины B, C и D. Каждая вершина (B, C, D) что-то делает с сообщением и возвращает A некоторые данные. Вершина A затем получает ответ от B, C, D и делает что-то со всеми данными. Проблема в том, что у меня есть обработчик для каждого отправляемого сообщения (один для A, один для B, один для C), я хочу, чтобы один обработчик вызывался, когда все ответы были получены.

4b9b3361

Ответ 1

Как и в случае Vert.x 3.2, в документации объясняется как несинхронизировать координаты с помощью Future и CompositeFuture.

Итак, скажем, вы хотите сделать два вызова send по шине событий и сделать что-то, когда оба успешно выполнены:

Future<Message> f1 = Future.future();
eventBus.send("first.address", "first message", f1.completer());

Future<Message> f2 = Future.future();
eventBus.send("second.address", "second message", f2.completer());

CompositeFuture.all(f1, f2).setHandler(result -> {
  // business as usual
});

В качестве аргументов могут передаваться до 6 фьючерсов или, альтернативно, их можно передавать как список.

Ответ 2

Лучший подход для этого - использовать Reactive Extensions, реализованный Netflix Rx.Java и предлагаемый Модуль RxVertx.

огромное количество операторов позволяет вам делать такие вещи, как "зачистка" результатов нескольких асинхронных вызовов в новый результат и делать то, что вы хочу с ним.

У меня есть простая демонстрация, доступная на GitHub, которая содержит:

final Observable<JsonObject> meters = observeMetricsSource(metricsAddress, METERS_BUS_REQUEST, "meters", rx);
final Observable<JsonObject> histograms = observeMetricsSource(metricsAddress, HISTOGRAMS_BUS_REQUEST, "histograms", rx);
subscribeAndRespondJson(zip(meters, histograms, (jo1, jo2) -> jo1.mergeIn(jo2)), req);

Этот фрагмент показывает, как два наблюдаемых, исходящих из двух асинхронных взаимодействий между шинами событий, получают "zipped" (т.е. объединены) в один окончательный ответ HTTP.