Подтвердить что ты не робот

Spring webflux и чтение из базы данных

Spring 5 вводит стиль реактивного программирования для API для отдыха с webflux. Я новичок в этом сам, и мне было интересно, как обертывать синхронные вызовы в базу данных в Flux или Mono имеет смысл preformence-wise? Если да, то это способ сделать это:

@RestController
public class HomeController {

    private MeasurementRepository repository;

    public HomeController(MeasurementRepository repository){
        this.repository = repository;
    }

    @GetMapping(value = "/v1/measurements")
    public Flux<Measurement> getMeasurements() {
        return Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L)));
    }

}

Есть ли что-то вроде асинхронного CrudRepository? Я не мог найти его.

4b9b3361

Ответ 1

Один из вариантов - использовать альтернативные SQL-клиенты, которые полностью не блокируются. Некоторые примеры включают: https://github.com/mauricio/postgresql-async или https://github.com/finagle/roc. Конечно, ни один из этих драйверов официально не поддерживается поставщиками баз данных. Кроме того, функциональность намного менее привлекательна по сравнению со зрелыми абстракциями на основе JDBC, такими как Hibernate или jOOQ.

Альтернативная идея пришла ко мне из мира Scala. Идея состоит в том, чтобы отправлять блокирующие вызовы в изолированный ThreadPool, чтобы не смешивать блокирующие и неблокирующие вызовы вместе. Это позволит нам контролировать общее количество потоков и позволит процессору выполнять неблокирующие задачи в главном контексте выполнения с некоторыми потенциальными оптимизациями. Предполагая, что у нас есть реализация на основе JDBC, такая как Spring Data JPA, которая действительно блокирует, мы можем сделать ее выполнение асинхронным и отправить в выделенный пул потоков.

@RestController
public class HomeController {

    private final MeasurementRepository repository;
    private final Scheduler scheduler;

    public HomeController(MeasurementRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) {
        this.repository = repository;
        this.scheduler = scheduler;
    }

    @GetMapping(value = "/v1/measurements")
    public Flux<Measurement> getMeasurements() {
        return Mono.fromCallable(() -> repository.findByFromDateGreaterThanEqual(new Date(1486980000L))).publishOn(scheduler);
    }

}

Наш планировщик для JDBC должен быть настроен с использованием выделенного пула потоков с размером, равным количеству подключений.

@Configuration
public class SchedulerConfiguration {
    private final Integer connectionPoolSize;

    public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) {
        this.connectionPoolSize = connectionPoolSize;
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

}

Однако с этим подходом возникают трудности. Главное - управление транзакциями. В JDBC транзакции возможны только в одном соединении java.sql.Connection. Чтобы выполнить несколько операций в одной транзакции, они должны совместно использовать соединение. Если мы хотим сделать некоторые вычисления между ними, мы должны поддерживать связь. Это не очень эффективно, так как мы сохраняем ограниченное количество соединений в режиме ожидания при выполнении вычислений между ними.

Эта идея асинхронной оболочки JDBC не является новой и уже реализована в библиотеке Scala Slick 3. Наконец, неблокирующий JDBC может появиться в дорожной карте Java. Как было объявлено на JavaOne в сентябре 2016 года, и возможно, что мы увидим его на Java 10.

Ответ 2

Spring поддерживает интерфейс реактивного хранилища данных для Mongo и Cassandra.

Spring Data MongoDb Реактивный интерфейс

Spring Data MongoDB обеспечивает поддержку реактивного репозитория с реактивными типами Project Reactor и RxJava 1. Реактивный API поддерживает преобразование реактивных типов между реактивными типами.

public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> {

    Flux<Person> findByLastname(String lastname);

    @Query("{ 'firstname': ?0, 'lastname': ?1}")
    Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);

    // Accept parameter inside a reactive type for deferred execution
    Flux<Person> findByLastname(Mono<String> lastname);

    Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname);

    @InfiniteStream // Use a tailable cursor
    Flux<Person> findWithTailableCursorBy();

}

public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> {

    Observable<Person> findByLastname(String lastname);

    @Query("{ 'firstname': ?0, 'lastname': ?1}")
    Single<Person> findByFirstnameAndLastname(String firstname, String lastname);

    // Accept parameter inside a reactive type for deferred execution
    Observable<Person> findByLastname(Single<String> lastname);

    Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname);

    @InfiniteStream // Use a tailable cursor
    Observable<Person> findWithTailableCursorBy();
}

Ответ 3

Основываясь на этом блоге, вы должны переписать свой фрагмент следующим образом

@GetMapping(value = "/v1/measurements")
public Flux<Measurement> getMeasurements() {
    return Flux.defer(() -> Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))))
           .subscribeOn(Schedulers.elastic());
}

Ответ 4

Получение Flux или Mono не обязательно означает, что они будут работать в выделенном потоке. Вместо этого большинство операторов продолжают работать в потоке, в котором выполнялся предыдущий оператор. Если не указано, самый верхний оператор (источник) сам выполняется в потоке, в котором был сделан вызов subscribe().

Если у вас есть блокирующие постоянные API-интерфейсы (JPA, JDBC) или сетевые API-интерфейсы, Spring MVC, по крайней мере, является лучшим выбором для распространенных архитектур. Технически выполнимо как с Reactor, так и с RxJava выполнять блокирующие вызовы в отдельном потоке, но вы не будете использовать большую часть неблокирующего веб-стека.

Итак... Как мне обернуть синхронный, блокирующий вызов?

Используйте Callable чтобы отложить выполнение. И вы должны использовать Schedulers.elastic потому что он создает выделенный поток для ожидания блокирующего ресурса, не связывая какой-либо другой ресурс.

  • Schedulers.immediate(): текущий поток.
  • Schedulers.single(): одиночная, многократно используемая нить.
  • Schedulers.newSingle(): отдельный поток для каждого вызова.
  • Schedulers.elastic(): Пул эластичных нитей. Он создает новые рабочие пулы по мере необходимости и повторно использует свободные. Например, это хороший выбор для работы по блокировке ввода/вывода.
  • Schedulers.parallel(): фиксированный пул рабочих, настроенный для параллельной работы.

    public Mono save() {Mono.fromCallable(() → blockingRepository.save()). subscribeOn (Schedulers.elastic()); } }