Multi-Threaded Steps and Async Processing for Performance
Introduction
A single-threaded Spring Batch step processes one chunk at a time — read N items, process N items, write N items, repeat. For large data sets this is a bottleneck. Spring Batch offers two in-JVM scaling options:
| Approach | How it works | Use when |
|---|---|---|
| Multi-threaded step | Multiple threads each process independent chunks | Reader is thread-safe (JdbcPagingItemReader) |
| AsyncItemProcessor | Processing runs concurrently; writes remain sequential | I/O-bound processors (REST calls, slow enrichment) |
This article covers both, plus the thread-safety requirements you must meet.
Multi-Threaded Steps
A multi-threaded step adds a TaskExecutor to the step builder. Spring Batch runs multiple chunk-processing threads concurrently — each thread reads a chunk, processes it, and writes it independently.
@Bean
public Step multiThreadedImportStep(JobRepository jobRepository,
PlatformTransactionManager tx,
JdbcPagingItemReader<Order> reader,
ItemProcessor<Order, ProcessedOrder> processor,
JdbcBatchItemWriter<ProcessedOrder> writer) {
return new StepBuilder("multiThreadedImportStep", jobRepository)
.<Order, ProcessedOrder>chunk(200, tx)
.reader(reader)
.processor(processor)
.writer(writer)
.taskExecutor(stepTaskExecutor())
.build();
}
@Bean
public TaskExecutor stepTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(0); // no queue — reject immediately if all threads busy
executor.setThreadNamePrefix("batch-step-");
executor.initialize();
return executor;
}
With 4 threads and chunk size 200, the step processes 800 items per “round” instead of 200.
Thread-Safety Requirements
Critical: the reader, processor, and writer must be thread-safe when used in a multi-threaded step.
Readers
| Reader | Thread-safe? | Use in multi-threaded step? |
|---|---|---|
JdbcPagingItemReader | Yes | Yes — each page query is independent |
JdbcCursorItemReader | No | No — single ResultSet, not thread-safe |
FlatFileItemReader | No | No — single file position pointer |
JpaPagingItemReader | Yes | Yes — each page uses its own EntityManager |
If you need to use a non-thread-safe reader (e.g., a flat file), use SynchronizedItemStreamReader:
@Bean
public SynchronizedItemStreamReader<Order> synchronizedCsvReader(
FlatFileItemReader<Order> delegate) {
SynchronizedItemStreamReader<Order> reader = new SynchronizedItemStreamReader<>();
reader.setDelegate(delegate);
return reader;
}
SynchronizedItemStreamReader wraps every read() call in a synchronized block. This ensures correctness but reduces throughput — all threads contend on a single lock. It is useful when you want multi-threaded processing but your reader is a file.
Processors
Processors are called once per item. A stateless processor is thread-safe by definition. If your processor holds mutable state (a cache, a counter), synchronize access:
@Component
public class CachedEnrichmentProcessor implements ItemProcessor<Order, Order> {
// ConcurrentHashMap is thread-safe
private final Map<Long, String> tierCache = new ConcurrentHashMap<>();
private final JdbcTemplate jdbc;
@Override
public Order process(Order order) {
String tier = tierCache.computeIfAbsent(order.getCustomerId(),
id -> jdbc.queryForObject(
"SELECT tier FROM customers WHERE customer_id = ?",
String.class, id));
order.setCustomerTier(tier);
return order;
}
}
Writers
JdbcBatchItemWriter is thread-safe — each chunk write call is independent. FlatFileItemWriter is not thread-safe. For concurrent writes to a file, use a synchronized wrapper or write to separate per-thread files then merge.
Disabling saveState for Multi-Threaded Steps
With multiple threads reading from a JdbcPagingItemReader, there is no single “current position” to save for restart. Disable saveState to prevent incorrect restart state:
@Bean
public JdbcPagingItemReader<Order> multiThreadedReader(DataSource dataSource) {
MySqlPagingQueryProvider qp = new MySqlPagingQueryProvider();
qp.setSelectClause("SELECT order_id, customer_id, amount, order_date, status");
qp.setFromClause("FROM orders");
qp.setWhereClause("WHERE status = 'PENDING'");
qp.setSortKeys(Map.of("order_id", Order.ASCENDING));
return new JdbcPagingItemReaderBuilder<Order>()
.name("multiThreadedOrderReader")
.dataSource(dataSource)
.queryProvider(qp)
.pageSize(200)
.rowMapper(new BeanPropertyRowMapper<>(Order.class))
.saveState(false) // disable — restart not safe with multi-threading
.build();
}
With saveState(false), if the step fails mid-run, it restarts from the beginning — but with idempotent writes (upsert) this is safe.
Throttle Limit
If your thread pool is larger than what the downstream database can handle, limit the number of concurrent chunks with .throttleLimit():
return new StepBuilder("step", jobRepository)
.<Order, ProcessedOrder>chunk(200, tx)
.reader(reader).processor(processor).writer(writer)
.taskExecutor(stepTaskExecutor())
.throttleLimit(4) // max 4 active chunks at a time (default = 4)
.build();
throttleLimit should equal your thread pool size. Setting it lower than pool size wastes threads; higher causes TaskRejectedException.
AsyncItemProcessor + AsyncItemWriter
For I/O-bound processors (REST API calls, slow lookups), AsyncItemProcessor runs each item’s processing in a separate thread. The step’s main thread collects Future<O> results and passes them to AsyncItemWriter which resolves them before writing.
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
@Bean
public AsyncItemProcessor<Order, Order> asyncFraudCheckProcessor(
FraudCheckProcessor syncProcessor) throws Exception {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("fraud-check-");
executor.initialize();
AsyncItemProcessor<Order, Order> async = new AsyncItemProcessor<>();
async.setDelegate(syncProcessor);
async.setTaskExecutor(executor);
async.afterPropertiesSet();
return async;
}
@Bean
public AsyncItemWriter<Order> asyncOrderWriter(
JdbcBatchItemWriter<Order> syncWriter) throws Exception {
AsyncItemWriter<Order> async = new AsyncItemWriter<>();
async.setDelegate(syncWriter);
async.afterPropertiesSet();
return async;
}
The step generic types change to Future<O>:
@Bean
public Step asyncProcessingStep(
JobRepository jobRepository,
PlatformTransactionManager tx,
JdbcPagingItemReader<Order> reader,
AsyncItemProcessor<Order, Order> asyncProcessor,
AsyncItemWriter<Order> asyncWriter) {
return new StepBuilder("asyncProcessingStep", jobRepository)
.<Order, Future<Order>>chunk(100, tx)
.reader(reader)
.processor(asyncProcessor)
.writer(asyncWriter)
.build();
}
Throughput gain: For a processor with 200ms latency (e.g., fraud API), 8 threads gives ~8x throughput — 40 items/s instead of 5 items/s.
Choosing Thread Pool Size
Start with the formula, then measure:
Optimal threads ≈ N_cores × (1 + wait_time / compute_time)
- CPU-bound processors (sorting, hashing, calculations):
threads = N_cores - I/O-bound processors (DB lookups, REST calls with 200ms latency):
threads = N_cores × 10–20 - Writer throughput (MySQL inserts): benchmark with different chunk sizes — typically diminishing returns above 8 threads
# JVM config for batch server
-XX:+UseG1GC
-Xms2g -Xmx4g
# HikariCP — pool size should match thread count
spring.datasource.hikari.maximum-pool-size=8
spring.datasource.hikari.minimum-idle=4
Complete Multi-Threaded Job
@Configuration
@RequiredArgsConstructor
public class MultiThreadedImportJobConfig {
private final DataSource dataSource;
private final JobRepository jobRepository;
private final PlatformTransactionManager tx;
@Bean
public JdbcPagingItemReader<Order> pendingOrderReader() {
MySqlPagingQueryProvider qp = new MySqlPagingQueryProvider();
qp.setSelectClause("SELECT order_id, customer_id, amount, order_date, status");
qp.setFromClause("FROM orders");
qp.setWhereClause("WHERE status = 'PENDING'");
qp.setSortKeys(Map.of("order_id", Order.ASCENDING));
return new JdbcPagingItemReaderBuilder<Order>()
.name("pendingOrderReader")
.dataSource(dataSource)
.queryProvider(qp)
.pageSize(200)
.rowMapper(new BeanPropertyRowMapper<>(Order.class))
.saveState(false)
.build();
}
@Bean
public ItemProcessor<Order, ProcessedOrder> orderProcessor() {
// Stateless — inherently thread-safe
return order -> {
ProcessedOrder po = new ProcessedOrder();
po.setOrderId(order.getOrderId());
po.setCustomerId(order.getCustomerId());
po.setFinalAmount(order.getAmount().multiply(new BigDecimal("1.1")));
po.setStatus("PROCESSED");
return po;
};
}
@Bean
public JdbcBatchItemWriter<ProcessedOrder> processedOrderWriter() {
return new JdbcBatchItemWriterBuilder<ProcessedOrder>()
.dataSource(dataSource)
.sql("INSERT INTO processed_orders (order_id, customer_id, final_amount, status) " +
"VALUES (:orderId, :customerId, :finalAmount, :status) " +
"ON DUPLICATE KEY UPDATE final_amount = VALUES(final_amount), status = VALUES(status)")
.beanMapped()
.assertUpdates(false)
.build();
}
@Bean
public TaskExecutor stepTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(0);
executor.setThreadNamePrefix("batch-step-");
executor.initialize();
return executor;
}
@Bean
public Step multiThreadedStep() {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Order, ProcessedOrder>chunk(200, tx)
.reader(pendingOrderReader())
.processor(orderProcessor())
.writer(processedOrderWriter())
.taskExecutor(stepTaskExecutor())
.faultTolerant()
.retry(DeadlockLoserDataAccessException.class)
.retryLimit(3)
.build();
}
@Bean
public Job multiThreadedImportJob() {
return new JobBuilder("multiThreadedImportJob", jobRepository)
.start(multiThreadedStep())
.build();
}
}
Key Takeaways
- Add
.taskExecutor()to a step to run chunks concurrently. SetsaveState(false)on the reader — multi-threaded restart positions are not reliable. JdbcPagingItemReaderandJpaPagingItemReaderare thread-safe.FlatFileItemReaderandJdbcCursorItemReaderare not — wrap withSynchronizedItemStreamReaderif needed.AsyncItemProcessor+AsyncItemWriterparallelises the processing phase only — writing remains sequential per chunk. Use it for I/O-bound processors.- Match thread pool size,
throttleLimit, andmaximum-pool-sizein HikariCP — otherwise connections will be exhausted or threads will queue. - Idempotent writes (
ON DUPLICATE KEY UPDATE) are essential whensaveState(false)— the step may re-read records on restart.
What’s Next
Article 21 covers partitioning — splitting a large data set into independent partitions processed concurrently, each with its own StepExecution and metadata.