Partitioning: Splitting Work Across Parallel Workers
Introduction
Multi-threaded steps (Article 20) run multiple chunks concurrently from a single reader. Partitioning is different — it splits the data into independent slices before processing starts, then runs each slice as its own StepExecution with its own reader, processor, writer, and metadata.
This gives you:
- True independence between partitions — one partition’s failure doesn’t affect others
- A separate
StepExecutionrow per partition inBATCH_STEP_EXECUTION— full visibility into per-partition progress - The ability to distribute partitions across multiple JVMs (remote partitioning, covered in Article 22)
Partitioning Architecture
ManagerStep (PartitionStep)
│
├── Partitioner → creates N ExecutionContexts (one per partition)
├── PartitionHandler → distributes partitions to workers
│
└── Worker Steps (run per partition)
├── ItemReader → reads only its slice of data
├── ItemProcessor
└── ItemWriter
The manager step runs once: it calls Partitioner.partition(gridSize) to produce N ExecutionContext maps, then hands them to the PartitionHandler. The PartitionHandler runs N worker step executions — locally on a thread pool or remotely across JVMs.
Implementing a Partitioner
A Partitioner returns a Map<String, ExecutionContext> — one entry per partition. Each ExecutionContext carries the data range (min/max ID, date range, file name, etc.) that the worker step will use.
Range Partitioner for MySQL (by primary key)
@Component
@RequiredArgsConstructor
public class OrderIdRangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Find the full range of pending orders
Map<String, Object> range = jdbcTemplate.queryForMap(
"SELECT MIN(order_id) AS min_id, MAX(order_id) AS max_id " +
"FROM orders WHERE status = 'PENDING'");
long minId = ((Number) range.get("min_id")).longValue();
long maxId = ((Number) range.get("max_id")).longValue();
long rangeSize = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
for (int i = 0; i < gridSize; i++) {
long start = minId + (i * rangeSize);
long end = (i == gridSize - 1) ? maxId : start + rangeSize - 1;
ExecutionContext ctx = new ExecutionContext();
ctx.putLong("minOrderId", start);
ctx.putLong("maxOrderId", end);
ctx.putInt("partitionIndex", i);
partitions.put("partition-" + i, ctx);
}
return partitions;
}
}
For 1,000,000 orders (IDs 1–1,000,000) with gridSize=4:
- partition-0: IDs 1–250,000
- partition-1: IDs 250,001–500,000
- partition-2: IDs 500,001–750,000
- partition-3: IDs 750,001–1,000,000
Date-based Partitioner
@Component
public class OrderDatePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// Partition by month — one partition per month
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
LocalDate start = LocalDate.of(2026, 1, 1);
for (int i = 0; i < gridSize; i++) {
LocalDate monthStart = start.plusMonths(i);
LocalDate monthEnd = monthStart.plusMonths(1).minusDays(1);
ExecutionContext ctx = new ExecutionContext();
ctx.putString("startDate", monthStart.toString());
ctx.putString("endDate", monthEnd.toString());
partitions.put("month-" + monthStart.getMonth(), ctx);
}
return partitions;
}
}
File-based Partitioner
@Component
public class MultiFilePartitioner implements Partitioner {
@Value("${batch.input.dir:/data/orders}")
private String inputDir;
@Override
public Map<String, ExecutionContext> partition(int gridSize) throws IOException {
Resource[] files = new PathMatchingResourcePatternResolver()
.getResources("file:" + inputDir + "/orders_*.csv");
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
for (int i = 0; i < files.length; i++) {
ExecutionContext ctx = new ExecutionContext();
ctx.putString("fileName", files[i].getFile().getAbsolutePath());
partitions.put("file-partition-" + i, ctx);
}
return partitions;
}
}
@StepScope Worker Step
The worker step uses @StepScope beans that read their data range from the stepExecutionContext:
@Bean
@StepScope
public JdbcPagingItemReader<Order> partitionedOrderReader(
DataSource dataSource,
@Value("#{stepExecutionContext['minOrderId']}") Long minOrderId,
@Value("#{stepExecutionContext['maxOrderId']}") Long maxOrderId) {
MySqlPagingQueryProvider qp = new MySqlPagingQueryProvider();
qp.setSelectClause("SELECT order_id, customer_id, amount, order_date, status");
qp.setFromClause("FROM orders");
qp.setWhereClause("WHERE status = 'PENDING' AND order_id BETWEEN :minId AND :maxId");
qp.setSortKeys(Map.of("order_id", Order.ASCENDING));
return new JdbcPagingItemReaderBuilder<Order>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.queryProvider(qp)
.parameterValues(Map.of("minId", minOrderId, "maxId", maxOrderId))
.pageSize(200)
.rowMapper(new BeanPropertyRowMapper<>(Order.class))
.build();
}
For a file-based partition:
@Bean
@StepScope
public FlatFileItemReader<Order> partitionedFileReader(
@Value("#{stepExecutionContext['fileName']}") String fileName) {
return new FlatFileItemReaderBuilder<Order>()
.name("partitionedFileReader")
.resource(new FileSystemResource(fileName))
.lineMapper(lineMapper())
.linesToSkip(1)
.build();
}
Local Partitioning with TaskExecutorPartitionHandler
For local (single JVM) partitioning, use TaskExecutorPartitionHandler:
@Bean
public Step workerStep(JobRepository jobRepository,
PlatformTransactionManager tx) {
return new StepBuilder("workerStep", jobRepository)
.<Order, ProcessedOrder>chunk(200, tx)
.reader(partitionedOrderReader(null, null, null)) // injected via @StepScope
.processor(orderProcessor())
.writer(processedOrderWriter())
.build();
}
@Bean
public PartitionHandler localPartitionHandler(Step workerStep) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setStep(workerStep);
handler.setGridSize(4); // 4 parallel partitions
handler.setTaskExecutor(partitionTaskExecutor());
return handler;
}
@Bean
public TaskExecutor partitionTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setThreadNamePrefix("partition-");
executor.initialize();
return executor;
}
@Bean
public Step managerStep(JobRepository jobRepository,
OrderIdRangePartitioner partitioner,
PartitionHandler partitionHandler) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner) // step name must match worker step name
.partitionHandler(partitionHandler)
.build();
}
@Bean
public Job partitionedImportJob(JobRepository jobRepository, Step managerStep) {
return new JobBuilder("partitionedImportJob", jobRepository)
.start(managerStep)
.build();
}
When the job runs:
managerStepcallspartitioner.partition(4)→ 4ExecutionContextmaps.TaskExecutorPartitionHandlersubmits 4 worker step executions to the thread pool.- Each worker runs independently with its own
StepExecutionand data range. - Manager waits for all workers to complete.
- If any worker fails, the manager step fails — other workers continue until complete.
Monitoring Partition Progress
Each partition creates a row in BATCH_STEP_EXECUTION:
SELECT
se.STEP_NAME,
sec.SHORT_CONTEXT,
se.STATUS,
se.READ_COUNT,
se.WRITE_COUNT,
se.START_TIME,
se.END_TIME
FROM BATCH_STEP_EXECUTION se
JOIN BATCH_STEP_EXECUTION_CONTEXT sec ON se.STEP_EXECUTION_ID = sec.STEP_EXECUTION_ID
WHERE se.JOB_EXECUTION_ID = ?
ORDER BY se.STEP_EXECUTION_ID;
Result shows one row per partition:
workerStep:partition-0 {"minOrderId":1, "maxOrderId":250000} COMPLETED 250000 249800
workerStep:partition-1 {"minOrderId":250001, "maxOrderId":500000} COMPLETED 250000 249950
workerStep:partition-2 {"minOrderId":500001, "maxOrderId":750000} FAILED 150000 149800
workerStep:partition-3 {"minOrderId":750001, "maxOrderId":1000000} COMPLETED 250000 249900
Partition 2 failed at ID ~650,000. On restart, only partition 2 re-runs — partitions 0, 1, and 3 are already COMPLETED and are skipped.
Choosing Grid Size
gridSize ≈ min(available_threads, data_set / chunk_size)
Practical rules:
- Start with
gridSize = available_cores(e.g., 4 or 8) - Each partition should have at least 10 chunks of work — otherwise partition overhead dominates
- For MySQL,
gridSizeshould not exceedHikariCP.maximum-pool-size / 2(leave connections for other operations)
spring.datasource.hikari.maximum-pool-size=16 # 8 partitions × 2 connections each
Key Takeaways
- Partitioning splits data into independent slices before processing. Each slice gets its own
StepExecution, metadata, and restart capability. - Implement
Partitioner.partition(gridSize)to produce oneExecutionContextper partition with the data range (min/max ID, date range, file path). - Worker steps use
@StepScopereaders that read their range fromstepExecutionContext. TaskExecutorPartitionHandlerruns partitions locally on a thread pool.gridSizecontrols parallelism.- On restart, only
FAILEDpartitions re-run. Completed partitions are skipped — this is the key advantage over multi-threaded steps.
What’s Next
Article 22 extends partitioning across multiple JVMs using remote partitioning and remote chunking with Kafka — the pattern for truly large-scale batch processing.