Подтвердить что ты не робот

Конфигурация асинхронного процессора Spring Batch для лучшей производительности

У меня возникла проблема с созданием асинхронного процессора в Spring Batch. Мой процессор получает ID от reader и создает объект на основе ответа от вызова SOAP. Иногда для 1 входа (ID) должно быть, например, 60-100 SOAP вызовов, а иногда и просто 1. Я пытался сделать многопоточный шаг, который он обрабатывал, например, 50 входов, но это было бесполезно, потому что 49 потоков выполнили свою работу за 1 секунду и были заблокирован, ожидая этого, который выполнял 60-100 SOAP вызовов. Теперь я использую AsyncItemProcessor + AsyncItemWriter но это решение работает медленно для меня. Поскольку мой ввод (IDs) большой, около 25 тыс. Элементов, прочитанных из БД, я хотел бы запустить ~ 50-100 входов в момент времени.

Вот моя конфигурация:

@Configuration
public class BatchConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    private DatabaseConfig databaseConfig;
    @Value(value = "classpath:Categories.txt")
    private Resource categories;

    @Bean
    public Job processJob() throws Exception {
        return jobBuilderFactory.get("processJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(orderStep1()).end().build();
    }

    @Bean
    public Step orderStep1() throws Exception {
        return stepBuilderFactory.get("orderStep1").<Category, CategoryDailyResult>chunk(1).reader(reader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).taskExecutor(taskExecutor()).build();
    }

    @Bean
    public JobExecutionListener listener() {
        return new JobCompletionListener();
    }


    @Bean
    public ItemWriter asyncItemWriter() {
        AsyncItemWriter<CategoryDailyResult> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(itemWriter());
        return asyncItemWriter;
    }

    @Bean
    public ItemWriter<CategoryDailyResult> itemWriter(){
        return new Writer();
    }

    @Bean
    public ItemProcessor asyncItemProcessor() {
        AsyncItemProcessor<Category, CategoryDailyResult> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(itemProcessor());
        asyncItemProcessor.setTaskExecutor(taskExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemProcessor<Category, CategoryDailyResult> itemProcessor(){
        return new Processor();
    }

    @Bean
    public TaskExecutor taskExecutor(){
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(50);
        return taskExecutor;
    }

    @Bean(destroyMethod = "")
    public ItemReader<Category> reader() throws Exception {
        String query = "select c from Category c where not exists elements(c.children)";

        JpaPagingItemReader<Category> reader = new JpaPagingItemReader<>();
        reader.setSaveState(false);
        reader.setQueryString(query);
        reader.setEntityManagerFactory(databaseConfig.entityManagerFactory().getObject());
        reader.setPageSize(1);

        return reader;
    }
}

Как я могу увеличить свое приложение? Может быть, я делаю что-то неправильно? Любая обратная связь приветствуется;)

@Edit: для ввода идентификаторов: от 1 до 100 я хочу, например, 50 потоков, которые выполняют процессор. Я хочу, чтобы они не блокировали друг друга: Thread1 обрабатывает входной сигнал "1" в течение 2 минут, и в это время я хочу, чтобы Thread2 обрабатывал ввод "2", "8", "64", которые являются небольшими и выполняются за несколько секунд.

@Edit2: Моя цель: у меня есть 25k ID в базе данных, я читаю их с JpaPagingItemReader и каждый идентификатор обрабатывается процессором. Каждый элемент не зависит друг от друга. Для каждого ID я делаю SOAP вызов в 0-100 раз в цикле, а затем создаю Object, который я передаю в Writer и сохраняю в базе данных. Как я могу получить лучшую производительность для такой задачи?

4b9b3361

Ответ 1

Вы должны разделить свою работу. Добавьте секционированный шаг следующим образом:

@Bean
public Step partitionedOrderStep1(Step orderStep1) {
    return stepBuilder.get("partitionedOrderStep1")
            .partitioner(orderStep1)
            .partitioner("orderStep1", new SimplePartitioner())
            .taskExecutor(taskExecutor())
            .gridSize(10)  //Number of concurrent partitions
            .build();
}

Затем используйте этот шаг в определении задания. Вызов.gridSize() настраивает количество разделов, которые будут выполняться одновременно. Если какой-либо из ваших объектов Reader, Processor или Writer является работоспособным, вам необходимо аннотировать их с помощью @StepScope.

Ответ 2

@KCrookedHand: Я имел дело с подобным сценарием, мне пришлось прочитать пару тысяч и мне нужно позвонить в SOAP Service (я ввел это в itemReader) для соответствия критериям.

Моя конфигурация выглядит ниже, в основном у вас есть пара вариантов для параллельной обработки, а два из них - подход "Разделение" и "Клиентский сервер". Я выбрал разделение, потому что у меня будет больше контроля над тем, сколько разделов мне нужно на основе моих данных.

Пожалуйста, ThreadPoolTaskExecutor, как @MichaelMinella, упомянутый ниже для выполнения Step-Execution с использованием тарелки, где это применимо.

<batch:step id="notificationMapper">
            <batch:partition partitioner="partitioner"
                step="readXXXStep" />
        </batch:step>
    </batch:job>


    <batch:step id="readXXXStep">
        <batch:job ref="jobRef" job-launcher="jobLauncher"
            job-parameters-extractor="jobParameterExtractor" />
    </batch:step>

    <batch:job id="jobRef">

        <batch:step id="dummyStep" next="skippedItemsDecision">
            <batch:tasklet ref="dummyTasklet"/>
            <batch:listeners>
                <batch:listener ref="stepListener" />
            </batch:listeners>
        </batch:step>

        <batch:step id="xxx.readItems" next="xxx.then.finish">
            <batch:tasklet>
                <batch:chunk reader="xxxChunkReader" processor="chunkProcessor"
                    writer="itemWriter" commit-interval="100">
                </batch:chunk>
            </batch:tasklet>
            <batch:listeners>
                <batch:listener ref="taskletListener" />
            </batch:listeners>
        </batch:step>

        ...