Подтвердить что ты не робот

Неощутимое отсутствие улучшения производительности с использованием RxJava Observables в веб-приложениях

Я выполняю некоторые тесты, чтобы оценить, есть ли реальное преимущество в использовании реактивного API на основе Observables вместо традиционных традиционных блокировок.

Весь пример доступен в Githug

Удивительно, что результаты показывают, что результаты расчета:

  • Лучшие: службы REST, которые возвращают Callable/DeferredResult, который завершает операции блокировки.

  • Не так уж плохо: блокировка служб REST.

  • Худший: службы REST, которые возвращают DeferredResult, результат которого задается RxJava Observable.

Это мой Spring WebApp:

Application

@SpringBootApplication
public class SpringNioRestApplication {

   @Bean
    public ThreadPoolTaskExecutor executor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        return executor;
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringNioRestApplication.class, args);
    }
}

SyncController

@RestController("SyncRestController")
@Api(value="", description="Synchronous data controller")
public class SyncRestController {

    @Autowired
    private DataService dataService;

    @RequestMapping(value="/sync/data", method=RequestMethod.GET, produces="application/json")
    @ApiOperation(value = "Gets data", notes="Gets data synchronously")
    @ApiResponses(value={@ApiResponse(code=200, message="OK")})
    public List<Data> getData(){
        return dataService.loadData();
    }
}

AsyncController: как с необработанными конечными точками с возможностью вызова и наблюдения

@RestController
@Api(value="", description="Synchronous data controller")
public class AsyncRestController {

    @Autowired
    private DataService dataService;

    private Scheduler scheduler;

    @Autowired
    private TaskExecutor executor;

     @PostConstruct
    protected void initializeScheduler(){
        scheduler = Schedulers.from(executor);
    }

    @RequestMapping(value="/async/data", method=RequestMethod.GET, produces="application/json")
    @ApiOperation(value = "Gets data", notes="Gets data asynchronously")
    @ApiResponses(value={@ApiResponse(code=200, message="OK")})
    public Callable<List<Data>> getData(){
        return ( () -> {return dataService.loadData();} );
    }

    @RequestMapping(value="/observable/data", method=RequestMethod.GET, produces="application/json")
     @ApiOperation(value = "Gets data through Observable", notes="Gets data asynchronously through Observable")
     @ApiResponses(value={@ApiResponse(code=200, message="OK")})
     public DeferredResult<List<Data>> getDataObservable(){
         DeferredResult<List<Data>> dr = new DeferredResult<List<Data>>();
         Observable<List<Data>> dataObservable = dataService.loadDataObservable();
         dataObservable.subscribeOn(scheduler).subscribe( dr::setResult, dr::setErrorResult);
         return dr;
     }
}

DataServiceImpl

@Service
public class DataServiceImpl implements DataService{

    @Override
    public List<Data> loadData() {
        return generateData();
    }

    @Override
    public Observable<List<Data>> loadDataObservable() {
        return Observable.create( s -> {
            List<Data> dataList = generateData();
            s.onNext(dataList);
            s.onCompleted();
        });
    }

    private List<Data> generateData(){
        List<Data> dataList = new ArrayList<Data>();
        for (int i = 0; i < 20; i++) {
            Data data = new Data("key"+i, "value"+i);
            dataList.add(data);
        }
        //Processing time simulation
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return dataList;
    }
}

Я установил задержку Thread.sleep(500), чтобы увеличить время отклика службы.

Результаты тестов нагрузки:

Async with Callable: 700 rps, без ошибок

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/async/data    
...
Requests: 0, requests per second: 0, mean latency: 0 ms
Requests: 2839, requests per second: 568, mean latency: 500 ms
Requests: 6337, requests per second: 700, mean latency: 500 ms
Requests: 9836, requests per second: 700, mean latency: 500 ms
...
Completed requests:  41337
Total errors:        0
Total time:          60.002348360999996 s
Requests per second: 689
Total time:          60.002348360999996 s

Блокировка: около 404 трлн, но создает ошибки

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/sync/data    
...
Requests: 7683, requests per second: 400, mean latency: 7420 ms
Requests: 9683, requests per second: 400, mean latency: 9570 ms
Requests: 11680, requests per second: 399, mean latency: 11720 ms
Requests: 13699, requests per second: 404, mean latency: 13760 ms
...
Percentage of the requests served within a certain time
  50%      8868 ms
  90%      22434 ms
  95%      24103 ms
  99%      25351 ms
 100%      26055 ms (longest request)

 100%      26055 ms (longest request)

   -1:   7559 errors
Requests: 31193, requests per second: 689, mean latency: 14350 ms
Errors: 1534, accumulated errors: 7559, 24.2% of total requests

Async with Observable: не более 20 rps и быстрее получает ошибки

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/observable/data
Requests: 0, requests per second: 0, mean latency: 0 ms
Requests: 90, requests per second: 18, mean latency: 2250 ms
Requests: 187, requests per second: 20, mean latency: 6770 ms
Requests: 265, requests per second: 16, mean latency: 11870 ms
Requests: 2872, requests per second: 521, mean latency: 1560 ms
Errors: 2518, accumulated errors: 2518, 87.7% of total requests
Requests: 6373, requests per second: 700, mean latency: 1590 ms
Errors: 3401, accumulated errors: 5919, 92.9% of total requests 

Наблюдение выполняется с помощью corePoolSize 10, но увеличение его до 50 ничего не улучшило.

Что может быть объяснением?

UPDATE. Как было предложено akarnokd, я внес следующие изменения. Перемещено из Object.create в Object.fromCallable в службе и повторно использовало Планировщик в контроллере, но все же я получаю те же результаты.

4b9b3361

Ответ 1

Проблема была вызвана ошибкой программирования в какой-то момент. На самом деле пример в вопросе работает отлично.

Одно предупреждение, чтобы у других не было проблем: остерегайтесь использования Observable.just(func), func на самом деле называется созданием Observable. Таким образом, любой объект Thread.sleep будет блокировать вызывающий поток

@Override
public Observable<List<Data>> loadDataObservable() {
    return Observable.just(generateData()).delay(500, TimeUnit.MILLISECONDS);
}

private List<Data> generateData(){
    List<Data> dataList = new ArrayList<Data>();
    for (int i = 0; i < 20; i++) {
        Data data = new Data("key"+i, "value"+i);
        dataList.add(data);
    }
    return dataList;
}

Я начал обсуждение в группе RxJava Google, где они помогли мне разобраться.