Multi-Threaded Steps and Async Processing for Performance

Introduction

A single-threaded Spring Batch step processes one chunk at a time — read N items, process N items, write N items, repeat. For large data sets this is a bottleneck. Spring Batch offers two in-JVM scaling options:

ApproachHow it worksUse when
Multi-threaded stepMultiple threads each process independent chunksReader is thread-safe (JdbcPagingItemReader)
AsyncItemProcessorProcessing runs concurrently; writes remain sequentialI/O-bound processors (REST calls, slow enrichment)

This article covers both, plus the thread-safety requirements you must meet.


Multi-Threaded Steps

A multi-threaded step adds a TaskExecutor to the step builder. Spring Batch runs multiple chunk-processing threads concurrently — each thread reads a chunk, processes it, and writes it independently.

@Bean
public Step multiThreadedImportStep(JobRepository jobRepository,
                                     PlatformTransactionManager tx,
                                     JdbcPagingItemReader<Order> reader,
                                     ItemProcessor<Order, ProcessedOrder> processor,
                                     JdbcBatchItemWriter<ProcessedOrder> writer) {

    return new StepBuilder("multiThreadedImportStep", jobRepository)
            .<Order, ProcessedOrder>chunk(200, tx)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .taskExecutor(stepTaskExecutor())
            .build();
}

@Bean
public TaskExecutor stepTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(4);
    executor.setQueueCapacity(0);          // no queue — reject immediately if all threads busy
    executor.setThreadNamePrefix("batch-step-");
    executor.initialize();
    return executor;
}

With 4 threads and chunk size 200, the step processes 800 items per “round” instead of 200.


Thread-Safety Requirements

Critical: the reader, processor, and writer must be thread-safe when used in a multi-threaded step.

Readers

ReaderThread-safe?Use in multi-threaded step?
JdbcPagingItemReaderYesYes — each page query is independent
JdbcCursorItemReaderNoNo — single ResultSet, not thread-safe
FlatFileItemReaderNoNo — single file position pointer
JpaPagingItemReaderYesYes — each page uses its own EntityManager

If you need to use a non-thread-safe reader (e.g., a flat file), use SynchronizedItemStreamReader:

@Bean
public SynchronizedItemStreamReader<Order> synchronizedCsvReader(
        FlatFileItemReader<Order> delegate) {

    SynchronizedItemStreamReader<Order> reader = new SynchronizedItemStreamReader<>();
    reader.setDelegate(delegate);
    return reader;
}

SynchronizedItemStreamReader wraps every read() call in a synchronized block. This ensures correctness but reduces throughput — all threads contend on a single lock. It is useful when you want multi-threaded processing but your reader is a file.

Processors

Processors are called once per item. A stateless processor is thread-safe by definition. If your processor holds mutable state (a cache, a counter), synchronize access:

@Component
public class CachedEnrichmentProcessor implements ItemProcessor<Order, Order> {

    // ConcurrentHashMap is thread-safe
    private final Map<Long, String> tierCache = new ConcurrentHashMap<>();
    private final JdbcTemplate jdbc;

    @Override
    public Order process(Order order) {
        String tier = tierCache.computeIfAbsent(order.getCustomerId(),
                id -> jdbc.queryForObject(
                        "SELECT tier FROM customers WHERE customer_id = ?",
                        String.class, id));
        order.setCustomerTier(tier);
        return order;
    }
}

Writers

JdbcBatchItemWriter is thread-safe — each chunk write call is independent. FlatFileItemWriter is not thread-safe. For concurrent writes to a file, use a synchronized wrapper or write to separate per-thread files then merge.


Disabling saveState for Multi-Threaded Steps

With multiple threads reading from a JdbcPagingItemReader, there is no single “current position” to save for restart. Disable saveState to prevent incorrect restart state:

@Bean
public JdbcPagingItemReader<Order> multiThreadedReader(DataSource dataSource) {
    MySqlPagingQueryProvider qp = new MySqlPagingQueryProvider();
    qp.setSelectClause("SELECT order_id, customer_id, amount, order_date, status");
    qp.setFromClause("FROM orders");
    qp.setWhereClause("WHERE status = 'PENDING'");
    qp.setSortKeys(Map.of("order_id", Order.ASCENDING));

    return new JdbcPagingItemReaderBuilder<Order>()
            .name("multiThreadedOrderReader")
            .dataSource(dataSource)
            .queryProvider(qp)
            .pageSize(200)
            .rowMapper(new BeanPropertyRowMapper<>(Order.class))
            .saveState(false)   // disable — restart not safe with multi-threading
            .build();
}

With saveState(false), if the step fails mid-run, it restarts from the beginning — but with idempotent writes (upsert) this is safe.


Throttle Limit

If your thread pool is larger than what the downstream database can handle, limit the number of concurrent chunks with .throttleLimit():

return new StepBuilder("step", jobRepository)
        .<Order, ProcessedOrder>chunk(200, tx)
        .reader(reader).processor(processor).writer(writer)
        .taskExecutor(stepTaskExecutor())
        .throttleLimit(4)   // max 4 active chunks at a time (default = 4)
        .build();

throttleLimit should equal your thread pool size. Setting it lower than pool size wastes threads; higher causes TaskRejectedException.


AsyncItemProcessor + AsyncItemWriter

For I/O-bound processors (REST API calls, slow lookups), AsyncItemProcessor runs each item’s processing in a separate thread. The step’s main thread collects Future<O> results and passes them to AsyncItemWriter which resolves them before writing.

<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-integration</artifactId>
</dependency>
@Bean
public AsyncItemProcessor<Order, Order> asyncFraudCheckProcessor(
        FraudCheckProcessor syncProcessor) throws Exception {

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(8);
    executor.setMaxPoolSize(16);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("fraud-check-");
    executor.initialize();

    AsyncItemProcessor<Order, Order> async = new AsyncItemProcessor<>();
    async.setDelegate(syncProcessor);
    async.setTaskExecutor(executor);
    async.afterPropertiesSet();
    return async;
}

@Bean
public AsyncItemWriter<Order> asyncOrderWriter(
        JdbcBatchItemWriter<Order> syncWriter) throws Exception {
    AsyncItemWriter<Order> async = new AsyncItemWriter<>();
    async.setDelegate(syncWriter);
    async.afterPropertiesSet();
    return async;
}

The step generic types change to Future<O>:

@Bean
public Step asyncProcessingStep(
        JobRepository jobRepository,
        PlatformTransactionManager tx,
        JdbcPagingItemReader<Order> reader,
        AsyncItemProcessor<Order, Order> asyncProcessor,
        AsyncItemWriter<Order> asyncWriter) {

    return new StepBuilder("asyncProcessingStep", jobRepository)
            .<Order, Future<Order>>chunk(100, tx)
            .reader(reader)
            .processor(asyncProcessor)
            .writer(asyncWriter)
            .build();
}

Throughput gain: For a processor with 200ms latency (e.g., fraud API), 8 threads gives ~8x throughput — 40 items/s instead of 5 items/s.


Choosing Thread Pool Size

Start with the formula, then measure:

Optimal threads ≈ N_cores × (1 + wait_time / compute_time)
  • CPU-bound processors (sorting, hashing, calculations): threads = N_cores
  • I/O-bound processors (DB lookups, REST calls with 200ms latency): threads = N_cores × 10–20
  • Writer throughput (MySQL inserts): benchmark with different chunk sizes — typically diminishing returns above 8 threads
# JVM config for batch server
-XX:+UseG1GC
-Xms2g -Xmx4g
# HikariCP — pool size should match thread count
spring.datasource.hikari.maximum-pool-size=8
spring.datasource.hikari.minimum-idle=4

Complete Multi-Threaded Job

@Configuration
@RequiredArgsConstructor
public class MultiThreadedImportJobConfig {

    private final DataSource dataSource;
    private final JobRepository jobRepository;
    private final PlatformTransactionManager tx;

    @Bean
    public JdbcPagingItemReader<Order> pendingOrderReader() {
        MySqlPagingQueryProvider qp = new MySqlPagingQueryProvider();
        qp.setSelectClause("SELECT order_id, customer_id, amount, order_date, status");
        qp.setFromClause("FROM orders");
        qp.setWhereClause("WHERE status = 'PENDING'");
        qp.setSortKeys(Map.of("order_id", Order.ASCENDING));

        return new JdbcPagingItemReaderBuilder<Order>()
                .name("pendingOrderReader")
                .dataSource(dataSource)
                .queryProvider(qp)
                .pageSize(200)
                .rowMapper(new BeanPropertyRowMapper<>(Order.class))
                .saveState(false)
                .build();
    }

    @Bean
    public ItemProcessor<Order, ProcessedOrder> orderProcessor() {
        // Stateless — inherently thread-safe
        return order -> {
            ProcessedOrder po = new ProcessedOrder();
            po.setOrderId(order.getOrderId());
            po.setCustomerId(order.getCustomerId());
            po.setFinalAmount(order.getAmount().multiply(new BigDecimal("1.1")));
            po.setStatus("PROCESSED");
            return po;
        };
    }

    @Bean
    public JdbcBatchItemWriter<ProcessedOrder> processedOrderWriter() {
        return new JdbcBatchItemWriterBuilder<ProcessedOrder>()
                .dataSource(dataSource)
                .sql("INSERT INTO processed_orders (order_id, customer_id, final_amount, status) " +
                     "VALUES (:orderId, :customerId, :finalAmount, :status) " +
                     "ON DUPLICATE KEY UPDATE final_amount = VALUES(final_amount), status = VALUES(status)")
                .beanMapped()
                .assertUpdates(false)
                .build();
    }

    @Bean
    public TaskExecutor stepTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(4);
        executor.setQueueCapacity(0);
        executor.setThreadNamePrefix("batch-step-");
        executor.initialize();
        return executor;
    }

    @Bean
    public Step multiThreadedStep() {
        return new StepBuilder("multiThreadedStep", jobRepository)
                .<Order, ProcessedOrder>chunk(200, tx)
                .reader(pendingOrderReader())
                .processor(orderProcessor())
                .writer(processedOrderWriter())
                .taskExecutor(stepTaskExecutor())
                .faultTolerant()
                .retry(DeadlockLoserDataAccessException.class)
                .retryLimit(3)
                .build();
    }

    @Bean
    public Job multiThreadedImportJob() {
        return new JobBuilder("multiThreadedImportJob", jobRepository)
                .start(multiThreadedStep())
                .build();
    }
}

Key Takeaways

  • Add .taskExecutor() to a step to run chunks concurrently. Set saveState(false) on the reader — multi-threaded restart positions are not reliable.
  • JdbcPagingItemReader and JpaPagingItemReader are thread-safe. FlatFileItemReader and JdbcCursorItemReader are not — wrap with SynchronizedItemStreamReader if needed.
  • AsyncItemProcessor + AsyncItemWriter parallelises the processing phase only — writing remains sequential per chunk. Use it for I/O-bound processors.
  • Match thread pool size, throttleLimit, and maximum-pool-size in HikariCP — otherwise connections will be exhausted or threads will queue.
  • Idempotent writes (ON DUPLICATE KEY UPDATE) are essential when saveState(false) — the step may re-read records on restart.

What’s Next

Article 21 covers partitioning — splitting a large data set into independent partitions processed concurrently, each with its own StepExecution and metadata.