Remote Partitioning and Remote Chunking with Kafka

Introduction

Local partitioning (Article 21) runs all workers on one JVM. When a single machine is the bottleneck — CPU, memory, or network bandwidth — you need workers on separate machines. Spring Batch Integration provides two patterns for this:

PatternWhat distributesCoordinator controlsWorkers do
Remote PartitioningPartition descriptors (small messages)Data splitting, aggregationFull read-process-write per partition
Remote ChunkingActual items (larger messages)ReadingProcessing + writing only

Remote partitioning is the more common choice — workers read directly from the database/file, so only small partition metadata crosses the network. Remote chunking sends items over the wire — use it when reading is the bottleneck and workers need no database access.


Dependencies

<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Remote Partitioning Architecture

Manager JVM                              Worker JVM(s)
──────────────────                       ──────────────────────────────
ManagerStep
  │
  ├─ Partitioner          ──requests──►  partition-requests topic
  │   (creates N contexts)
  │
  └─ MessageChannelPartitionHandler
       │
       └─ waits for replies ◄──replies── partition-replies topic
                                               │
                                           WorkerStep (per partition)
                                               ├─ reads its slice from DB
                                               ├─ processes
                                               └─ writes results

Kafka Topics Setup

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic partitionRequestsTopic() {
        return TopicBuilder.name("partition-requests")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @Bean
    public NewTopic partitionRepliesTopic() {
        return TopicBuilder.name("partition-replies")
                .partitions(10)
                .replicas(1)
                .build();
    }
}

Manager Configuration

@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class ManagerBatchConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager tx;
    private final DataSource dataSource;

    // ── Partitioner ─────────────────────────────────────────────────────────

    @Bean
    public OrderIdRangePartitioner partitioner() {
        return new OrderIdRangePartitioner(new JdbcTemplate(dataSource));
    }

    // ── Kafka Channels ──────────────────────────────────────────────────────

    @Bean
    public DirectChannel requestsChannel() {
        return new DirectChannel();
    }

    @Bean
    public QueueChannel repliesChannel() {
        return new QueueChannel();
    }

    @Bean
    public KafkaMessageDrivenChannelAdapter<String, String> repliesAdapter(
            ConsumerFactory<String, String> consumerFactory) {

        ContainerProperties containerProps = new ContainerProperties("partition-replies");
        KafkaMessageListenerContainer<String, String> container =
                new KafkaMessageListenerContainer<>(consumerFactory, containerProps);

        KafkaMessageDrivenChannelAdapter<String, String> adapter =
                new KafkaMessageDrivenChannelAdapter<>(container);
        adapter.setOutputChannel(repliesChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "requestsChannel")
    public MessageHandler kafkaRequestsHandler(
            KafkaTemplate<String, String> kafkaTemplate) {

        KafkaProducerMessageHandler<String, String> handler =
                new KafkaProducerMessageHandler<>(kafkaTemplate);
        handler.setTopicExpression(new LiteralExpression("partition-requests"));
        return handler;
    }

    // ── Partition Handler ────────────────────────────────────────────────────

    @Bean
    public PartitionHandler remotePartitionHandler() throws Exception {
        MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
        handler.setStepName("workerStep");
        handler.setGridSize(4);
        handler.setReplyChannel(repliesChannel());
        handler.setMessagingOperations(messagingTemplate());
        handler.setPollInterval(2000);        // poll for replies every 2 seconds
        handler.setTimeout(3_600_000);        // 1 hour max wait
        handler.afterPropertiesSet();
        return handler;
    }

    @Bean
    public MessagingTemplate messagingTemplate() {
        MessagingTemplate template = new MessagingTemplate();
        template.setDefaultChannel(requestsChannel());
        template.setReceiveTimeout(3_600_000);
        return template;
    }

    // ── Manager Step ────────────────────────────────────────────────────────

    @Bean
    public Step managerStep() throws Exception {
        return new StepBuilder("managerStep", jobRepository)
                .partitioner("workerStep", partitioner())
                .partitionHandler(remotePartitionHandler())
                .build();
    }

    @Bean
    public Job remotePartitionedJob() throws Exception {
        return new JobBuilder("remotePartitionedJob", jobRepository)
                .start(managerStep())
                .build();
    }
}

Worker Configuration

Workers listen on the requests topic, execute the step, and reply on the replies topic.

@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class WorkerBatchConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager tx;
    private final DataSource dataSource;

    // ── Worker Step ─────────────────────────────────────────────────────────

    @Bean
    @StepScope
    public JdbcPagingItemReader<Order> workerOrderReader(
            @Value("#{stepExecutionContext['minOrderId']}") Long minId,
            @Value("#{stepExecutionContext['maxOrderId']}") Long maxId) {

        MySqlPagingQueryProvider qp = new MySqlPagingQueryProvider();
        qp.setSelectClause("SELECT order_id, customer_id, amount, order_date, status");
        qp.setFromClause("FROM orders");
        qp.setWhereClause("WHERE order_id BETWEEN :minId AND :maxId AND status = 'PENDING'");
        qp.setSortKeys(Map.of("order_id", Order.ASCENDING));

        return new JdbcPagingItemReaderBuilder<Order>()
                .name("workerOrderReader")
                .dataSource(dataSource)
                .queryProvider(qp)
                .parameterValues(Map.of("minId", minId, "maxId", maxId))
                .pageSize(200)
                .rowMapper(new BeanPropertyRowMapper<>(Order.class))
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<ProcessedOrder> workerWriter() {
        return new JdbcBatchItemWriterBuilder<ProcessedOrder>()
                .dataSource(dataSource)
                .sql("INSERT INTO processed_orders (order_id, customer_id, final_amount, status) " +
                     "VALUES (:orderId, :customerId, :finalAmount, :status) " +
                     "ON DUPLICATE KEY UPDATE status = VALUES(status), final_amount = VALUES(final_amount)")
                .beanMapped()
                .assertUpdates(false)
                .build();
    }

    @Bean
    public Step workerStep() {
        return new StepBuilder("workerStep", jobRepository)
                .<Order, ProcessedOrder>chunk(200, tx)
                .reader(workerOrderReader(null, null))
                .processor(orderProcessor())
                .writer(workerWriter())
                .build();
    }

    // ── Kafka Integration ────────────────────────────────────────────────────

    @Bean
    public DirectChannel requestsChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel repliesChannel() {
        return new DirectChannel();
    }

    @Bean
    public KafkaMessageDrivenChannelAdapter<String, String> requestsAdapter(
            ConsumerFactory<String, String> consumerFactory) {

        ContainerProperties props = new ContainerProperties("partition-requests");
        KafkaMessageListenerContainer<String, String> container =
                new KafkaMessageListenerContainer<>(consumerFactory, props);

        KafkaMessageDrivenChannelAdapter<String, String> adapter =
                new KafkaMessageDrivenChannelAdapter<>(container);
        adapter.setOutputChannel(requestsChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "requestsChannel",
                      outputChannel = "repliesChannel")
    public StepExecutionRequestHandler stepExecutionRequestHandler() {
        StepExecutionRequestHandler handler = new StepExecutionRequestHandler();
        handler.setJobExplorer(jobExplorer);
        handler.setStepLocator(stepLocator());
        return handler;
    }

    @Bean
    @ServiceActivator(inputChannel = "repliesChannel")
    public MessageHandler kafkaRepliesHandler(
            KafkaTemplate<String, String> kafkaTemplate) {
        KafkaProducerMessageHandler<String, String> handler =
                new KafkaProducerMessageHandler<>(kafkaTemplate);
        handler.setTopicExpression(new LiteralExpression("partition-replies"));
        return handler;
    }

    @Bean
    public BeanFactoryStepLocator stepLocator() {
        return new BeanFactoryStepLocator();
    }
}

Remote Chunking Overview

Remote chunking keeps the manager doing the reading; workers only process and write.

Manager JVM                           Worker JVM(s)
─────────────────────                 ──────────────────────────────
ManagerStep
  │
  ├─ ItemReader (reads from DB/file)
  │   │
  │   └─ sends chunks to ──────────►  chunk-requests topic
  │                                       │
  └─ waits for acks ◄──────────────── chunk-replies topic
                                          │
                                      Worker
                                          ├─ ItemProcessor
                                          └─ ItemWriter (writes to DB)

Use remote chunking when:

  • Reading is not the bottleneck (the manager can keep up)
  • Workers have no direct database access to the source
  • You want to horizontally scale writes only

Remote chunking sends actual items over Kafka — keep items small or use compact serialisation.

// Manager side — sends items to workers
@Bean
public ChunkMessageChannelItemWriter<Order> remoteChunkWriter(
        MessagingTemplate messagingTemplate) {
    ChunkMessageChannelItemWriter<Order> writer = new ChunkMessageChannelItemWriter<>();
    writer.setMessagingOperations(messagingTemplate);
    writer.setReplyChannel(repliesChannel());
    return writer;
}

// Manager step — reads normally, delegates writing to workers
@Bean
public Step remoteChunkManagerStep() {
    return new StepBuilder("remoteChunkManagerStep", jobRepository)
            .<Order, Order>chunk(100, tx)
            .reader(orderReader())
            .writer(remoteChunkWriter(messagingTemplate()))
            .build();
}
// Worker side — receives chunks, processes, writes
@Bean
@ServiceActivator(inputChannel = "chunkRequestsChannel",
                  outputChannel = "chunkRepliesChannel")
public ChunkProcessorChunkHandler<Order> chunkHandler() {
    ChunkProcessorChunkHandler<Order> handler = new ChunkProcessorChunkHandler<>();
    handler.setChunkProcessor(new SimpleChunkProcessor<>(
            orderProcessor(), orderWriter()));
    return handler;
}

application.properties — Worker

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=batch-workers
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# Shared batch metadata DB (all JVMs must point to the same DB)
spring.datasource.url=jdbc:mysql://db-host:3306/batch_db
spring.batch.jdbc.initialize-schema=never
spring.batch.job.enabled=false   # workers don't launch jobs

Deployment Topology

┌─────────────────────┐      Kafka      ┌─────────────────────┐
│   Manager Pod       │ ──requests──►   │   Worker Pod 1      │
│   (1 instance)      │ ◄──replies───   │   (N instances)     │
│                     │                 ├─────────────────────┤
│   Runs job          │                 │   Worker Pod 2      │
│   Splits partitions │                 ├─────────────────────┤
│   Aggregates results│                 │   Worker Pod 3      │
└─────────────────────┘                 └─────────────────────┘
          │                                       │
          └───────────────────────────────────────┘
                    Shared MySQL (batch metadata + data)
  • Manager and workers share the same MySQL for batch metadata.
  • Workers read the source data directly from MySQL (remote partitioning) or receive items from Kafka (remote chunking).
  • Scale workers horizontally — add pods to increase throughput.

Key Takeaways

  • Remote partitioning: manager sends small partition descriptors over Kafka; workers read data directly. Best for database-backed jobs.
  • Remote chunking: manager reads and sends items over Kafka; workers process and write. Best when reading is trivial and writing is the bottleneck.
  • All JVMs must share the same Spring Batch metadata database — coordination depends on it.
  • MessageChannelPartitionHandler on the manager side and StepExecutionRequestHandler on the worker side are the two key integration beans.
  • Workers should set spring.batch.job.enabled=false — they respond to messages, they do not launch jobs themselves.

What’s Next

Part 9 (Scaling) is complete. Article 23 starts Part 10 — Production. You will learn how to schedule batch jobs with @Scheduled, Quartz Scheduler, and clustered scheduling for high-availability deployments.