Sending Messages with Keys, Headers, and Custom Partitioning

Why Partitioning Strategy Matters

How you route messages to partitions determines:

  • Ordering: only messages in the same partition are ordered relative to each other
  • Parallelism: how evenly work is distributed across consumers
  • Hot spots: if one key generates 90% of traffic, one partition (and one consumer) gets 90% of the load
flowchart TD
    subgraph Routing["Message Routing Decision"]
        Msg["Message"]
        HasKey{Has key?}
        HasPartition{Explicit partition?}
        KeyHash["hash(key) % numPartitions\n→ deterministic, same partition always"]
        RoundRobin["Sticky partitioning\n(batch to same partition,\nthen round-robin)"]
        Explicit["Use specified partition number"]
        Custom["Custom Partitioner\n(your business logic)"]
    end

    Msg --> HasPartition
    HasPartition -->|Yes| Explicit
    HasPartition -->|No| HasKey
    HasKey -->|Yes| KeyHash
    HasKey -->|No| RoundRobin
    HasKey -->|Custom partitioner set| Custom

Message Keys and Partition Affinity

Using a key guarantees that all messages with the same key land on the same partition — preserving per-key ordering.

flowchart LR
    subgraph Producer
        E1["orderId: ORD-1\nOrderPlaced"]
        E2["orderId: ORD-2\nOrderPlaced"]
        E3["orderId: ORD-1\nPaymentCharged"]
        E4["orderId: ORD-1\nOrderShipped"]
    end

    subgraph Topic["Topic: orders (3 partitions)"]
        P0["Partition 0\nhash(ORD-1) % 3 = 0\n[ORD-1:Placed][ORD-1:Charged][ORD-1:Shipped]"]
        P1["Partition 1\nhash(ORD-2) % 3 = 1\n[ORD-2:Placed]"]
        P2["Partition 2 (empty)"]
    end

    E1 -->|key=ORD-1| P0
    E3 -->|key=ORD-1| P0
    E4 -->|key=ORD-1| P0
    E2 -->|key=ORD-2| P1

All three events for ORD-1 are on Partition 0, in the order they were produced. The Inventory Service consumer reading Partition 0 sees them in that exact sequence.

@Service
public class OrderEventPublisher {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    // Order lifecycle events — keyed by orderId for ordering
    public void publishOrderPlaced(OrderPlacedEvent event) {
        kafkaTemplate.send("orders", event.orderId(), event);
    }

    public void publishOrderShipped(OrderShippedEvent event) {
        kafkaTemplate.send("orders", event.orderId(), event);  // same key, same partition
    }

    public void publishOrderCancelled(OrderCancelledEvent event) {
        kafkaTemplate.send("orders", event.orderId(), event);  // same key, same partition
    }
}

Choosing the Right Key

Key choiceOrdering guaranteeDistribution
orderIdAll events for one order in sequenceEven if orders are spread randomly
customerIdAll customer’s orders in sequenceRisk of hot spots for power users
nullNone (round-robin)Most even distribution
regionCodeEvents per region in sequenceMay create hot spots by region

Rule of thumb: key by the entity whose events must be processed in order. For e-commerce: orderId for the order lifecycle topic, productId for inventory updates.


Message Headers

Headers are key-value metadata attached to a record without being part of the payload. Use them for:

  • Distributed tracing (correlation IDs, trace IDs)
  • Event type routing at the consumer
  • Schema version
  • Source service name

Setting Headers with MessageBuilder

import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.support.MessageBuilder;

@Service
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

    public void publishOrderPlaced(OrderPlacedEvent event, String correlationId) {
        Message<OrderPlacedEvent> message = MessageBuilder
            .withPayload(event)
            .setHeader(KafkaHeaders.TOPIC, "orders")
            .setHeader(KafkaHeaders.KEY, event.orderId())
            .setHeader("correlationId", correlationId)
            .setHeader("eventType", "ORDER_PLACED")
            .setHeader("sourceService", "order-service")
            .setHeader("schemaVersion", "1")
            .build();

        kafkaTemplate.send(message);
    }
}

Setting Headers with ProducerRecord

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;

public void publishWithRawHeaders(OrderPlacedEvent event, String traceId) {
    RecordHeaders headers = new RecordHeaders();
    headers.add("traceId", traceId.getBytes(StandardCharsets.UTF_8));
    headers.add("eventType", "ORDER_PLACED".getBytes(StandardCharsets.UTF_8));
    headers.add("sourceService", "order-service".getBytes(StandardCharsets.UTF_8));

    ProducerRecord<String, OrderPlacedEvent> record = new ProducerRecord<>(
        "orders", null, System.currentTimeMillis(), event.orderId(), event, headers
    );

    kafkaTemplate.send(record);
}

Reading Headers in the Consumer

@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrderEvent(
        @Payload OrderPlacedEvent event,
        @Header("correlationId") String correlationId,
        @Header("eventType") String eventType,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.OFFSET) long offset) {

    log.info("[{}] Received {} on partition={} offset={}",
        correlationId, eventType, partition, offset);
    inventoryService.reserveStock(event);
}

Correlation ID Propagation Pattern

In a microservices architecture, propagate a correlation ID from the HTTP request through every Kafka event to enable end-to-end request tracing.

sequenceDiagram
    participant Client
    participant OrderSvc as Order Service
    participant Kafka
    participant InvSvc as Inventory Service
    participant Log as Central Logging

    Client->>OrderSvc: POST /orders\nX-Correlation-Id: abc-123
    OrderSvc->>OrderSvc: Create order
    OrderSvc->>Kafka: Publish OrderPlacedEvent\nheader: correlationId=abc-123
    Kafka->>InvSvc: Deliver OrderPlacedEvent\nheader: correlationId=abc-123
    InvSvc->>InvSvc: Reserve stock
    InvSvc->>Log: log("correlationId=abc-123 stock reserved")
    OrderSvc->>Log: log("correlationId=abc-123 order created")
    Note over Client,Log: All logs for one request share\ncorrelationId=abc-123
@Component
public class CorrelationIdProducerInterceptor implements ProducerInterceptor<String, Object> {

    @Override
    public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
        String correlationId = MDC.get("correlationId");
        if (correlationId != null) {
            record.headers().add("correlationId",
                correlationId.getBytes(StandardCharsets.UTF_8));
        }
        return record;
    }

    @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
    @Override public void close() {}
    @Override public void configure(Map<String, ?> configs) {}
}

Register the interceptor:

spring.kafka.producer.properties.interceptor.classes=\
  com.example.CorrelationIdProducerInterceptor

Custom Partitioner

When the default key-hash routing doesn’t fit your needs, implement Partitioner:

Example: Route by Customer Tier

VIP customers get Partition 0 (dedicated consumer with higher concurrency). Standard customers are distributed across Partitions 1–N.

public class CustomerTierPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {

        int numPartitions = cluster.partitionCountForTopic(topic);

        // key is customerId
        if (key instanceof String customerId) {
            if (isVipCustomer(customerId)) {
                return 0;  // VIP always to partition 0
            }
        }

        // Standard customers: hash across partitions 1..N
        int hash = Math.abs(Arrays.hashCode(keyBytes));
        return 1 + (hash % (numPartitions - 1));
    }

    private boolean isVipCustomer(String customerId) {
        return customerId.startsWith("VIP-");
    }

    @Override public void close() {}
    @Override public void configure(Map<String, ?> configs) {}
}

Register in configuration:

spring.kafka.producer.properties.partitioner.class=\
  com.example.CustomerTierPartitioner
flowchart LR
    subgraph Customers
        VIP["VIP-001\nVIP-002"]
        Std["CUST-100\nCUST-200\nCUST-300"]
    end

    subgraph Partitions
        P0["Partition 0\n(VIP only)\nhigh-priority consumer"]
        P1["Partition 1"]
        P2["Partition 2"]
    end

    VIP -->|always| P0
    Std -->|hash % 2 + 1| P1
    Std -->|hash % 2 + 1| P2

Uniform Sticky Partitioner (Built-in, Kafka 2.4+)

For keyless records, the default UniformStickyPartitioner batches records to one partition until the batch is full or linger.ms elapses, then moves to the next. This improves throughput by reducing the number of produce requests while maintaining reasonable distribution.

No configuration needed — it is the default for keyless records since Kafka 2.4.


Explicit Partition Assignment

For cases where you know exactly which partition to target:

// Partition 0 for high-priority orders
public void publishHighPriorityOrder(OrderPlacedEvent event) {
    ProducerRecord<String, OrderPlacedEvent> record = new ProducerRecord<>(
        "orders",
        0,              // explicit partition
        event.orderId(),
        event
    );
    kafkaTemplate.send(record);
}

Use explicit partition assignment sparingly. It bypasses key-based routing and custom partitioners. It breaks if partition count changes. Prefer keys + custom partitioners.


Partitioning Anti-Patterns

flowchart TD
    AP1["❌ Anti-Pattern 1:\nAll messages with null key\n→ uneven distribution after partition addition"]
    AP2["❌ Anti-Pattern 2:\nUsing timestamp as key\n→ monotonically increasing → last partition gets all writes"]
    AP3["❌ Anti-Pattern 3:\nVery low cardinality key\n(e.g. boolean isVip)\n→ only 2 active partitions regardless of count"]
    AP4["❌ Anti-Pattern 4:\nChanging key strategy mid-life\n→ breaks ordering for in-flight events"]

    Fix1["✓ Use a high-cardinality business key\n(orderId, customerId, productId)"]
    Fix2["✓ Use entity ID as key;\nif ordering by time matters,\nproduce to a single partition"]
    Fix3["✓ Add sub-key: regionCode+isVip\nfor better distribution"]
    Fix4["✓ Plan key strategy before first event;\nif changing, coordinate with consumers"]

    AP1 --- Fix1
    AP2 --- Fix2
    AP3 --- Fix3
    AP4 --- Fix4

Key Takeaways

  • Keys guarantee partition affinity: same key → same partition → ordered delivery for that key
  • Choose keys that match your ordering requirements — orderId for order lifecycle, productId for inventory
  • Headers carry metadata without polluting the payload — use them for correlation IDs, event types, schema versions
  • Propagate correlation IDs from HTTP requests into Kafka headers using a ProducerInterceptor
  • Implement Partitioner for custom routing logic (priority queues, tenant-based routing, region-based sharding)
  • Avoid low-cardinality keys (booleans, enums with few values) — they create hot spots
  • Explicit partition assignment bypasses routing logic — prefer keys + custom partitioners

Next: Producer @Bean Configuration: Beyond application.properties — full programmatic control over producer configuration, multiple producers with different settings, and the ProducerFactory API.