У меня возникла проблема с созданием асинхронного процессора в Spring Batch. Мой процессор получает ID
от reader
и создает объект на основе ответа от вызова SOAP
. Иногда для 1 входа (ID
) должно быть, например, 60-100 SOAP
вызовов, а иногда и просто 1. Я пытался сделать многопоточный шаг, который он обрабатывал, например, 50 входов, но это было бесполезно, потому что 49 потоков выполнили свою работу за 1 секунду и были заблокирован, ожидая этого, который выполнял 60-100 SOAP
вызовов. Теперь я использую AsyncItemProcessor
+ AsyncItemWriter
но это решение работает медленно для меня. Поскольку мой ввод (IDs
) большой, около 25 тыс. Элементов, прочитанных из БД, я хотел бы запустить ~ 50-100 входов в момент времени.
Вот моя конфигурация:
@Configuration
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private DatabaseConfig databaseConfig;
@Value(value = "classpath:Categories.txt")
private Resource categories;
@Bean
public Job processJob() throws Exception {
return jobBuilderFactory.get("processJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(orderStep1()).end().build();
}
@Bean
public Step orderStep1() throws Exception {
return stepBuilderFactory.get("orderStep1").<Category, CategoryDailyResult>chunk(1).reader(reader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).taskExecutor(taskExecutor()).build();
}
@Bean
public JobExecutionListener listener() {
return new JobCompletionListener();
}
@Bean
public ItemWriter asyncItemWriter() {
AsyncItemWriter<CategoryDailyResult> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(itemWriter());
return asyncItemWriter;
}
@Bean
public ItemWriter<CategoryDailyResult> itemWriter(){
return new Writer();
}
@Bean
public ItemProcessor asyncItemProcessor() {
AsyncItemProcessor<Category, CategoryDailyResult> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor());
asyncItemProcessor.setTaskExecutor(taskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemProcessor<Category, CategoryDailyResult> itemProcessor(){
return new Processor();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(50);
return taskExecutor;
}
@Bean(destroyMethod = "")
public ItemReader<Category> reader() throws Exception {
String query = "select c from Category c where not exists elements(c.children)";
JpaPagingItemReader<Category> reader = new JpaPagingItemReader<>();
reader.setSaveState(false);
reader.setQueryString(query);
reader.setEntityManagerFactory(databaseConfig.entityManagerFactory().getObject());
reader.setPageSize(1);
return reader;
}
}
Как я могу увеличить свое приложение? Может быть, я делаю что-то неправильно? Любая обратная связь приветствуется;)
@Edit: для ввода идентификаторов: от 1 до 100 я хочу, например, 50 потоков, которые выполняют процессор. Я хочу, чтобы они не блокировали друг друга: Thread1 обрабатывает входной сигнал "1" в течение 2 минут, и в это время я хочу, чтобы Thread2 обрабатывал ввод "2", "8", "64", которые являются небольшими и выполняются за несколько секунд.
@Edit2: Моя цель: у меня есть 25k ID в базе данных, я читаю их с JpaPagingItemReader
и каждый идентификатор обрабатывается процессором. Каждый элемент не зависит друг от друга. Для каждого ID я делаю SOAP
вызов в 0-100 раз в цикле, а затем создаю Object, который я передаю в Writer
и сохраняю в базе данных. Как я могу получить лучшую производительность для такой задачи?