Remote Partitioning and Remote Chunking with Kafka
Introduction
Local partitioning (Article 21) runs all workers on one JVM. When a single machine is the bottleneck — CPU, memory, or network bandwidth — you need workers on separate machines. Spring Batch Integration provides two patterns for this:
| Pattern | What distributes | Coordinator controls | Workers do |
|---|---|---|---|
| Remote Partitioning | Partition descriptors (small messages) | Data splitting, aggregation | Full read-process-write per partition |
| Remote Chunking | Actual items (larger messages) | Reading | Processing + writing only |
Remote partitioning is the more common choice — workers read directly from the database/file, so only small partition metadata crosses the network. Remote chunking sends items over the wire — use it when reading is the bottleneck and workers need no database access.
Dependencies
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Remote Partitioning Architecture
Manager JVM Worker JVM(s)
────────────────── ──────────────────────────────
ManagerStep
│
├─ Partitioner ──requests──► partition-requests topic
│ (creates N contexts)
│
└─ MessageChannelPartitionHandler
│
└─ waits for replies ◄──replies── partition-replies topic
│
WorkerStep (per partition)
├─ reads its slice from DB
├─ processes
└─ writes results
Kafka Topics Setup
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic partitionRequestsTopic() {
return TopicBuilder.name("partition-requests")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public NewTopic partitionRepliesTopic() {
return TopicBuilder.name("partition-replies")
.partitions(10)
.replicas(1)
.build();
}
}
Manager Configuration
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class ManagerBatchConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager tx;
private final DataSource dataSource;
// ── Partitioner ─────────────────────────────────────────────────────────
@Bean
public OrderIdRangePartitioner partitioner() {
return new OrderIdRangePartitioner(new JdbcTemplate(dataSource));
}
// ── Kafka Channels ──────────────────────────────────────────────────────
@Bean
public DirectChannel requestsChannel() {
return new DirectChannel();
}
@Bean
public QueueChannel repliesChannel() {
return new QueueChannel();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> repliesAdapter(
ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("partition-replies");
KafkaMessageListenerContainer<String, String> container =
new KafkaMessageListenerContainer<>(consumerFactory, containerProps);
KafkaMessageDrivenChannelAdapter<String, String> adapter =
new KafkaMessageDrivenChannelAdapter<>(container);
adapter.setOutputChannel(repliesChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "requestsChannel")
public MessageHandler kafkaRequestsHandler(
KafkaTemplate<String, String> kafkaTemplate) {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression("partition-requests"));
return handler;
}
// ── Partition Handler ────────────────────────────────────────────────────
@Bean
public PartitionHandler remotePartitionHandler() throws Exception {
MessageChannelPartitionHandler handler = new MessageChannelPartitionHandler();
handler.setStepName("workerStep");
handler.setGridSize(4);
handler.setReplyChannel(repliesChannel());
handler.setMessagingOperations(messagingTemplate());
handler.setPollInterval(2000); // poll for replies every 2 seconds
handler.setTimeout(3_600_000); // 1 hour max wait
handler.afterPropertiesSet();
return handler;
}
@Bean
public MessagingTemplate messagingTemplate() {
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(requestsChannel());
template.setReceiveTimeout(3_600_000);
return template;
}
// ── Manager Step ────────────────────────────────────────────────────────
@Bean
public Step managerStep() throws Exception {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", partitioner())
.partitionHandler(remotePartitionHandler())
.build();
}
@Bean
public Job remotePartitionedJob() throws Exception {
return new JobBuilder("remotePartitionedJob", jobRepository)
.start(managerStep())
.build();
}
}
Worker Configuration
Workers listen on the requests topic, execute the step, and reply on the replies topic.
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class WorkerBatchConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager tx;
private final DataSource dataSource;
// ── Worker Step ─────────────────────────────────────────────────────────
@Bean
@StepScope
public JdbcPagingItemReader<Order> workerOrderReader(
@Value("#{stepExecutionContext['minOrderId']}") Long minId,
@Value("#{stepExecutionContext['maxOrderId']}") Long maxId) {
MySqlPagingQueryProvider qp = new MySqlPagingQueryProvider();
qp.setSelectClause("SELECT order_id, customer_id, amount, order_date, status");
qp.setFromClause("FROM orders");
qp.setWhereClause("WHERE order_id BETWEEN :minId AND :maxId AND status = 'PENDING'");
qp.setSortKeys(Map.of("order_id", Order.ASCENDING));
return new JdbcPagingItemReaderBuilder<Order>()
.name("workerOrderReader")
.dataSource(dataSource)
.queryProvider(qp)
.parameterValues(Map.of("minId", minId, "maxId", maxId))
.pageSize(200)
.rowMapper(new BeanPropertyRowMapper<>(Order.class))
.build();
}
@Bean
public JdbcBatchItemWriter<ProcessedOrder> workerWriter() {
return new JdbcBatchItemWriterBuilder<ProcessedOrder>()
.dataSource(dataSource)
.sql("INSERT INTO processed_orders (order_id, customer_id, final_amount, status) " +
"VALUES (:orderId, :customerId, :finalAmount, :status) " +
"ON DUPLICATE KEY UPDATE status = VALUES(status), final_amount = VALUES(final_amount)")
.beanMapped()
.assertUpdates(false)
.build();
}
@Bean
public Step workerStep() {
return new StepBuilder("workerStep", jobRepository)
.<Order, ProcessedOrder>chunk(200, tx)
.reader(workerOrderReader(null, null))
.processor(orderProcessor())
.writer(workerWriter())
.build();
}
// ── Kafka Integration ────────────────────────────────────────────────────
@Bean
public DirectChannel requestsChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel repliesChannel() {
return new DirectChannel();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> requestsAdapter(
ConsumerFactory<String, String> consumerFactory) {
ContainerProperties props = new ContainerProperties("partition-requests");
KafkaMessageListenerContainer<String, String> container =
new KafkaMessageListenerContainer<>(consumerFactory, props);
KafkaMessageDrivenChannelAdapter<String, String> adapter =
new KafkaMessageDrivenChannelAdapter<>(container);
adapter.setOutputChannel(requestsChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "requestsChannel",
outputChannel = "repliesChannel")
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler handler = new StepExecutionRequestHandler();
handler.setJobExplorer(jobExplorer);
handler.setStepLocator(stepLocator());
return handler;
}
@Bean
@ServiceActivator(inputChannel = "repliesChannel")
public MessageHandler kafkaRepliesHandler(
KafkaTemplate<String, String> kafkaTemplate) {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression("partition-replies"));
return handler;
}
@Bean
public BeanFactoryStepLocator stepLocator() {
return new BeanFactoryStepLocator();
}
}
Remote Chunking Overview
Remote chunking keeps the manager doing the reading; workers only process and write.
Manager JVM Worker JVM(s)
───────────────────── ──────────────────────────────
ManagerStep
│
├─ ItemReader (reads from DB/file)
│ │
│ └─ sends chunks to ──────────► chunk-requests topic
│ │
└─ waits for acks ◄──────────────── chunk-replies topic
│
Worker
├─ ItemProcessor
└─ ItemWriter (writes to DB)
Use remote chunking when:
- Reading is not the bottleneck (the manager can keep up)
- Workers have no direct database access to the source
- You want to horizontally scale writes only
Remote chunking sends actual items over Kafka — keep items small or use compact serialisation.
// Manager side — sends items to workers
@Bean
public ChunkMessageChannelItemWriter<Order> remoteChunkWriter(
MessagingTemplate messagingTemplate) {
ChunkMessageChannelItemWriter<Order> writer = new ChunkMessageChannelItemWriter<>();
writer.setMessagingOperations(messagingTemplate);
writer.setReplyChannel(repliesChannel());
return writer;
}
// Manager step — reads normally, delegates writing to workers
@Bean
public Step remoteChunkManagerStep() {
return new StepBuilder("remoteChunkManagerStep", jobRepository)
.<Order, Order>chunk(100, tx)
.reader(orderReader())
.writer(remoteChunkWriter(messagingTemplate()))
.build();
}
// Worker side — receives chunks, processes, writes
@Bean
@ServiceActivator(inputChannel = "chunkRequestsChannel",
outputChannel = "chunkRepliesChannel")
public ChunkProcessorChunkHandler<Order> chunkHandler() {
ChunkProcessorChunkHandler<Order> handler = new ChunkProcessorChunkHandler<>();
handler.setChunkProcessor(new SimpleChunkProcessor<>(
orderProcessor(), orderWriter()));
return handler;
}
application.properties — Worker
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=batch-workers
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Shared batch metadata DB (all JVMs must point to the same DB)
spring.datasource.url=jdbc:mysql://db-host:3306/batch_db
spring.batch.jdbc.initialize-schema=never
spring.batch.job.enabled=false # workers don't launch jobs
Deployment Topology
┌─────────────────────┐ Kafka ┌─────────────────────┐
│ Manager Pod │ ──requests──► │ Worker Pod 1 │
│ (1 instance) │ ◄──replies─── │ (N instances) │
│ │ ├─────────────────────┤
│ Runs job │ │ Worker Pod 2 │
│ Splits partitions │ ├─────────────────────┤
│ Aggregates results│ │ Worker Pod 3 │
└─────────────────────┘ └─────────────────────┘
│ │
└───────────────────────────────────────┘
Shared MySQL (batch metadata + data)
- Manager and workers share the same MySQL for batch metadata.
- Workers read the source data directly from MySQL (remote partitioning) or receive items from Kafka (remote chunking).
- Scale workers horizontally — add pods to increase throughput.
Key Takeaways
- Remote partitioning: manager sends small partition descriptors over Kafka; workers read data directly. Best for database-backed jobs.
- Remote chunking: manager reads and sends items over Kafka; workers process and write. Best when reading is trivial and writing is the bottleneck.
- All JVMs must share the same Spring Batch metadata database — coordination depends on it.
MessageChannelPartitionHandleron the manager side andStepExecutionRequestHandleron the worker side are the two key integration beans.- Workers should set
spring.batch.job.enabled=false— they respond to messages, they do not launch jobs themselves.
What’s Next
Part 9 (Scaling) is complete. Article 23 starts Part 10 — Production. You will learn how to schedule batch jobs with @Scheduled, Quartz Scheduler, and clustered scheduling for high-availability deployments.