Partitioning: Splitting Work Across Parallel Workers

Introduction

Multi-threaded steps (Article 20) run multiple chunks concurrently from a single reader. Partitioning is different — it splits the data into independent slices before processing starts, then runs each slice as its own StepExecution with its own reader, processor, writer, and metadata.

This gives you:

  • True independence between partitions — one partition’s failure doesn’t affect others
  • A separate StepExecution row per partition in BATCH_STEP_EXECUTION — full visibility into per-partition progress
  • The ability to distribute partitions across multiple JVMs (remote partitioning, covered in Article 22)

Partitioning Architecture

ManagerStep (PartitionStep)
  │
  ├── Partitioner         → creates N ExecutionContexts (one per partition)
  ├── PartitionHandler    → distributes partitions to workers
  │
  └── Worker Steps (run per partition)
        ├── ItemReader    → reads only its slice of data
        ├── ItemProcessor
        └── ItemWriter

The manager step runs once: it calls Partitioner.partition(gridSize) to produce N ExecutionContext maps, then hands them to the PartitionHandler. The PartitionHandler runs N worker step executions — locally on a thread pool or remotely across JVMs.


Implementing a Partitioner

A Partitioner returns a Map<String, ExecutionContext> — one entry per partition. Each ExecutionContext carries the data range (min/max ID, date range, file name, etc.) that the worker step will use.

Range Partitioner for MySQL (by primary key)

@Component
@RequiredArgsConstructor
public class OrderIdRangePartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Find the full range of pending orders
        Map<String, Object> range = jdbcTemplate.queryForMap(
                "SELECT MIN(order_id) AS min_id, MAX(order_id) AS max_id " +
                "FROM orders WHERE status = 'PENDING'");

        long minId = ((Number) range.get("min_id")).longValue();
        long maxId = ((Number) range.get("max_id")).longValue();
        long rangeSize = (maxId - minId) / gridSize + 1;

        Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
        for (int i = 0; i < gridSize; i++) {
            long start = minId + (i * rangeSize);
            long end   = (i == gridSize - 1) ? maxId : start + rangeSize - 1;

            ExecutionContext ctx = new ExecutionContext();
            ctx.putLong("minOrderId", start);
            ctx.putLong("maxOrderId", end);
            ctx.putInt("partitionIndex", i);

            partitions.put("partition-" + i, ctx);
        }
        return partitions;
    }
}

For 1,000,000 orders (IDs 1–1,000,000) with gridSize=4:

  • partition-0: IDs 1–250,000
  • partition-1: IDs 250,001–500,000
  • partition-2: IDs 500,001–750,000
  • partition-3: IDs 750,001–1,000,000

Date-based Partitioner

@Component
public class OrderDatePartitioner implements Partitioner {

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // Partition by month — one partition per month
        Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
        LocalDate start = LocalDate.of(2026, 1, 1);

        for (int i = 0; i < gridSize; i++) {
            LocalDate monthStart = start.plusMonths(i);
            LocalDate monthEnd   = monthStart.plusMonths(1).minusDays(1);

            ExecutionContext ctx = new ExecutionContext();
            ctx.putString("startDate", monthStart.toString());
            ctx.putString("endDate",   monthEnd.toString());

            partitions.put("month-" + monthStart.getMonth(), ctx);
        }
        return partitions;
    }
}

File-based Partitioner

@Component
public class MultiFilePartitioner implements Partitioner {

    @Value("${batch.input.dir:/data/orders}")
    private String inputDir;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) throws IOException {
        Resource[] files = new PathMatchingResourcePatternResolver()
                .getResources("file:" + inputDir + "/orders_*.csv");

        Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
        for (int i = 0; i < files.length; i++) {
            ExecutionContext ctx = new ExecutionContext();
            ctx.putString("fileName", files[i].getFile().getAbsolutePath());
            partitions.put("file-partition-" + i, ctx);
        }
        return partitions;
    }
}

@StepScope Worker Step

The worker step uses @StepScope beans that read their data range from the stepExecutionContext:

@Bean
@StepScope
public JdbcPagingItemReader<Order> partitionedOrderReader(
        DataSource dataSource,
        @Value("#{stepExecutionContext['minOrderId']}") Long minOrderId,
        @Value("#{stepExecutionContext['maxOrderId']}") Long maxOrderId) {

    MySqlPagingQueryProvider qp = new MySqlPagingQueryProvider();
    qp.setSelectClause("SELECT order_id, customer_id, amount, order_date, status");
    qp.setFromClause("FROM orders");
    qp.setWhereClause("WHERE status = 'PENDING' AND order_id BETWEEN :minId AND :maxId");
    qp.setSortKeys(Map.of("order_id", Order.ASCENDING));

    return new JdbcPagingItemReaderBuilder<Order>()
            .name("partitionedOrderReader")
            .dataSource(dataSource)
            .queryProvider(qp)
            .parameterValues(Map.of("minId", minOrderId, "maxId", maxOrderId))
            .pageSize(200)
            .rowMapper(new BeanPropertyRowMapper<>(Order.class))
            .build();
}

For a file-based partition:

@Bean
@StepScope
public FlatFileItemReader<Order> partitionedFileReader(
        @Value("#{stepExecutionContext['fileName']}") String fileName) {

    return new FlatFileItemReaderBuilder<Order>()
            .name("partitionedFileReader")
            .resource(new FileSystemResource(fileName))
            .lineMapper(lineMapper())
            .linesToSkip(1)
            .build();
}

Local Partitioning with TaskExecutorPartitionHandler

For local (single JVM) partitioning, use TaskExecutorPartitionHandler:

@Bean
public Step workerStep(JobRepository jobRepository,
                        PlatformTransactionManager tx) {

    return new StepBuilder("workerStep", jobRepository)
            .<Order, ProcessedOrder>chunk(200, tx)
            .reader(partitionedOrderReader(null, null, null))   // injected via @StepScope
            .processor(orderProcessor())
            .writer(processedOrderWriter())
            .build();
}

@Bean
public PartitionHandler localPartitionHandler(Step workerStep) {
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
    handler.setStep(workerStep);
    handler.setGridSize(4);                // 4 parallel partitions
    handler.setTaskExecutor(partitionTaskExecutor());
    return handler;
}

@Bean
public TaskExecutor partitionTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(4);
    executor.setThreadNamePrefix("partition-");
    executor.initialize();
    return executor;
}

@Bean
public Step managerStep(JobRepository jobRepository,
                         OrderIdRangePartitioner partitioner,
                         PartitionHandler partitionHandler) {

    return new StepBuilder("managerStep", jobRepository)
            .partitioner("workerStep", partitioner)   // step name must match worker step name
            .partitionHandler(partitionHandler)
            .build();
}

@Bean
public Job partitionedImportJob(JobRepository jobRepository, Step managerStep) {
    return new JobBuilder("partitionedImportJob", jobRepository)
            .start(managerStep)
            .build();
}

When the job runs:

  1. managerStep calls partitioner.partition(4) → 4 ExecutionContext maps.
  2. TaskExecutorPartitionHandler submits 4 worker step executions to the thread pool.
  3. Each worker runs independently with its own StepExecution and data range.
  4. Manager waits for all workers to complete.
  5. If any worker fails, the manager step fails — other workers continue until complete.

Monitoring Partition Progress

Each partition creates a row in BATCH_STEP_EXECUTION:

SELECT
    se.STEP_NAME,
    sec.SHORT_CONTEXT,
    se.STATUS,
    se.READ_COUNT,
    se.WRITE_COUNT,
    se.START_TIME,
    se.END_TIME
FROM BATCH_STEP_EXECUTION se
JOIN BATCH_STEP_EXECUTION_CONTEXT sec ON se.STEP_EXECUTION_ID = sec.STEP_EXECUTION_ID
WHERE se.JOB_EXECUTION_ID = ?
ORDER BY se.STEP_EXECUTION_ID;

Result shows one row per partition:

workerStep:partition-0  {"minOrderId":1, "maxOrderId":250000}      COMPLETED  250000  249800
workerStep:partition-1  {"minOrderId":250001, "maxOrderId":500000}  COMPLETED  250000  249950
workerStep:partition-2  {"minOrderId":500001, "maxOrderId":750000}  FAILED     150000  149800
workerStep:partition-3  {"minOrderId":750001, "maxOrderId":1000000} COMPLETED  250000  249900

Partition 2 failed at ID ~650,000. On restart, only partition 2 re-runs — partitions 0, 1, and 3 are already COMPLETED and are skipped.


Choosing Grid Size

gridSize ≈ min(available_threads, data_set / chunk_size)

Practical rules:

  • Start with gridSize = available_cores (e.g., 4 or 8)
  • Each partition should have at least 10 chunks of work — otherwise partition overhead dominates
  • For MySQL, gridSize should not exceed HikariCP.maximum-pool-size / 2 (leave connections for other operations)
spring.datasource.hikari.maximum-pool-size=16  # 8 partitions × 2 connections each

Key Takeaways

  • Partitioning splits data into independent slices before processing. Each slice gets its own StepExecution, metadata, and restart capability.
  • Implement Partitioner.partition(gridSize) to produce one ExecutionContext per partition with the data range (min/max ID, date range, file path).
  • Worker steps use @StepScope readers that read their range from stepExecutionContext.
  • TaskExecutorPartitionHandler runs partitions locally on a thread pool. gridSize controls parallelism.
  • On restart, only FAILED partitions re-run. Completed partitions are skipped — this is the key advantage over multi-threaded steps.

What’s Next

Article 22 extends partitioning across multiple JVMs using remote partitioning and remote chunking with Kafka — the pattern for truly large-scale batch processing.