Sending Messages with Keys, Headers, and Custom Partitioning
Why Partitioning Strategy Matters
How you route messages to partitions determines:
- Ordering: only messages in the same partition are ordered relative to each other
- Parallelism: how evenly work is distributed across consumers
- Hot spots: if one key generates 90% of traffic, one partition (and one consumer) gets 90% of the load
flowchart TD
subgraph Routing["Message Routing Decision"]
Msg["Message"]
HasKey{Has key?}
HasPartition{Explicit partition?}
KeyHash["hash(key) % numPartitions\n→ deterministic, same partition always"]
RoundRobin["Sticky partitioning\n(batch to same partition,\nthen round-robin)"]
Explicit["Use specified partition number"]
Custom["Custom Partitioner\n(your business logic)"]
end
Msg --> HasPartition
HasPartition -->|Yes| Explicit
HasPartition -->|No| HasKey
HasKey -->|Yes| KeyHash
HasKey -->|No| RoundRobin
HasKey -->|Custom partitioner set| Custom
Message Keys and Partition Affinity
Using a key guarantees that all messages with the same key land on the same partition — preserving per-key ordering.
flowchart LR
subgraph Producer
E1["orderId: ORD-1\nOrderPlaced"]
E2["orderId: ORD-2\nOrderPlaced"]
E3["orderId: ORD-1\nPaymentCharged"]
E4["orderId: ORD-1\nOrderShipped"]
end
subgraph Topic["Topic: orders (3 partitions)"]
P0["Partition 0\nhash(ORD-1) % 3 = 0\n[ORD-1:Placed][ORD-1:Charged][ORD-1:Shipped]"]
P1["Partition 1\nhash(ORD-2) % 3 = 1\n[ORD-2:Placed]"]
P2["Partition 2 (empty)"]
end
E1 -->|key=ORD-1| P0
E3 -->|key=ORD-1| P0
E4 -->|key=ORD-1| P0
E2 -->|key=ORD-2| P1
All three events for ORD-1 are on Partition 0, in the order they were produced. The Inventory Service consumer reading Partition 0 sees them in that exact sequence.
@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
// Order lifecycle events — keyed by orderId for ordering
public void publishOrderPlaced(OrderPlacedEvent event) {
kafkaTemplate.send("orders", event.orderId(), event);
}
public void publishOrderShipped(OrderShippedEvent event) {
kafkaTemplate.send("orders", event.orderId(), event); // same key, same partition
}
public void publishOrderCancelled(OrderCancelledEvent event) {
kafkaTemplate.send("orders", event.orderId(), event); // same key, same partition
}
}
Choosing the Right Key
| Key choice | Ordering guarantee | Distribution |
|---|---|---|
orderId | All events for one order in sequence | Even if orders are spread randomly |
customerId | All customer’s orders in sequence | Risk of hot spots for power users |
null | None (round-robin) | Most even distribution |
regionCode | Events per region in sequence | May create hot spots by region |
Rule of thumb: key by the entity whose events must be processed in order. For e-commerce: orderId for the order lifecycle topic, productId for inventory updates.
Message Headers
Headers are key-value metadata attached to a record without being part of the payload. Use them for:
- Distributed tracing (correlation IDs, trace IDs)
- Event type routing at the consumer
- Schema version
- Source service name
Setting Headers with MessageBuilder
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.support.MessageBuilder;
@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
public void publishOrderPlaced(OrderPlacedEvent event, String correlationId) {
Message<OrderPlacedEvent> message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.TOPIC, "orders")
.setHeader(KafkaHeaders.KEY, event.orderId())
.setHeader("correlationId", correlationId)
.setHeader("eventType", "ORDER_PLACED")
.setHeader("sourceService", "order-service")
.setHeader("schemaVersion", "1")
.build();
kafkaTemplate.send(message);
}
}
Setting Headers with ProducerRecord
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
public void publishWithRawHeaders(OrderPlacedEvent event, String traceId) {
RecordHeaders headers = new RecordHeaders();
headers.add("traceId", traceId.getBytes(StandardCharsets.UTF_8));
headers.add("eventType", "ORDER_PLACED".getBytes(StandardCharsets.UTF_8));
headers.add("sourceService", "order-service".getBytes(StandardCharsets.UTF_8));
ProducerRecord<String, OrderPlacedEvent> record = new ProducerRecord<>(
"orders", null, System.currentTimeMillis(), event.orderId(), event, headers
);
kafkaTemplate.send(record);
}
Reading Headers in the Consumer
@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrderEvent(
@Payload OrderPlacedEvent event,
@Header("correlationId") String correlationId,
@Header("eventType") String eventType,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("[{}] Received {} on partition={} offset={}",
correlationId, eventType, partition, offset);
inventoryService.reserveStock(event);
}
Correlation ID Propagation Pattern
In a microservices architecture, propagate a correlation ID from the HTTP request through every Kafka event to enable end-to-end request tracing.
sequenceDiagram
participant Client
participant OrderSvc as Order Service
participant Kafka
participant InvSvc as Inventory Service
participant Log as Central Logging
Client->>OrderSvc: POST /orders\nX-Correlation-Id: abc-123
OrderSvc->>OrderSvc: Create order
OrderSvc->>Kafka: Publish OrderPlacedEvent\nheader: correlationId=abc-123
Kafka->>InvSvc: Deliver OrderPlacedEvent\nheader: correlationId=abc-123
InvSvc->>InvSvc: Reserve stock
InvSvc->>Log: log("correlationId=abc-123 stock reserved")
OrderSvc->>Log: log("correlationId=abc-123 order created")
Note over Client,Log: All logs for one request share\ncorrelationId=abc-123
@Component
public class CorrelationIdProducerInterceptor implements ProducerInterceptor<String, Object> {
@Override
public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
String correlationId = MDC.get("correlationId");
if (correlationId != null) {
record.headers().add("correlationId",
correlationId.getBytes(StandardCharsets.UTF_8));
}
return record;
}
@Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
Register the interceptor:
spring.kafka.producer.properties.interceptor.classes=\
com.example.CorrelationIdProducerInterceptor
Custom Partitioner
When the default key-hash routing doesn’t fit your needs, implement Partitioner:
Example: Route by Customer Tier
VIP customers get Partition 0 (dedicated consumer with higher concurrency). Standard customers are distributed across Partitions 1–N.
public class CustomerTierPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
// key is customerId
if (key instanceof String customerId) {
if (isVipCustomer(customerId)) {
return 0; // VIP always to partition 0
}
}
// Standard customers: hash across partitions 1..N
int hash = Math.abs(Arrays.hashCode(keyBytes));
return 1 + (hash % (numPartitions - 1));
}
private boolean isVipCustomer(String customerId) {
return customerId.startsWith("VIP-");
}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
Register in configuration:
spring.kafka.producer.properties.partitioner.class=\
com.example.CustomerTierPartitioner
flowchart LR
subgraph Customers
VIP["VIP-001\nVIP-002"]
Std["CUST-100\nCUST-200\nCUST-300"]
end
subgraph Partitions
P0["Partition 0\n(VIP only)\nhigh-priority consumer"]
P1["Partition 1"]
P2["Partition 2"]
end
VIP -->|always| P0
Std -->|hash % 2 + 1| P1
Std -->|hash % 2 + 1| P2
Uniform Sticky Partitioner (Built-in, Kafka 2.4+)
For keyless records, the default UniformStickyPartitioner batches records to one partition until the batch is full or linger.ms elapses, then moves to the next. This improves throughput by reducing the number of produce requests while maintaining reasonable distribution.
No configuration needed — it is the default for keyless records since Kafka 2.4.
Explicit Partition Assignment
For cases where you know exactly which partition to target:
// Partition 0 for high-priority orders
public void publishHighPriorityOrder(OrderPlacedEvent event) {
ProducerRecord<String, OrderPlacedEvent> record = new ProducerRecord<>(
"orders",
0, // explicit partition
event.orderId(),
event
);
kafkaTemplate.send(record);
}
Use explicit partition assignment sparingly. It bypasses key-based routing and custom partitioners. It breaks if partition count changes. Prefer keys + custom partitioners.
Partitioning Anti-Patterns
flowchart TD
AP1["❌ Anti-Pattern 1:\nAll messages with null key\n→ uneven distribution after partition addition"]
AP2["❌ Anti-Pattern 2:\nUsing timestamp as key\n→ monotonically increasing → last partition gets all writes"]
AP3["❌ Anti-Pattern 3:\nVery low cardinality key\n(e.g. boolean isVip)\n→ only 2 active partitions regardless of count"]
AP4["❌ Anti-Pattern 4:\nChanging key strategy mid-life\n→ breaks ordering for in-flight events"]
Fix1["✓ Use a high-cardinality business key\n(orderId, customerId, productId)"]
Fix2["✓ Use entity ID as key;\nif ordering by time matters,\nproduce to a single partition"]
Fix3["✓ Add sub-key: regionCode+isVip\nfor better distribution"]
Fix4["✓ Plan key strategy before first event;\nif changing, coordinate with consumers"]
AP1 --- Fix1
AP2 --- Fix2
AP3 --- Fix3
AP4 --- Fix4
Key Takeaways
- Keys guarantee partition affinity: same key → same partition → ordered delivery for that key
- Choose keys that match your ordering requirements —
orderIdfor order lifecycle,productIdfor inventory - Headers carry metadata without polluting the payload — use them for correlation IDs, event types, schema versions
- Propagate correlation IDs from HTTP requests into Kafka headers using a
ProducerInterceptor - Implement
Partitionerfor custom routing logic (priority queues, tenant-based routing, region-based sharding) - Avoid low-cardinality keys (booleans, enums with few values) — they create hot spots
- Explicit partition assignment bypasses routing logic — prefer keys + custom partitioners
Next: Producer @Bean Configuration: Beyond application.properties — full programmatic control over producer configuration, multiple producers with different settings, and the ProducerFactory API.