Message Headers: Metadata, Routing, and Custom Header Propagation
What Are Kafka Record Headers?
Every Kafka record carries a list of Header objects — key-value pairs of String key and byte[] value. They sit outside the message payload and are ideal for:
- Trace propagation — carry
X-Trace-Id/X-Span-Idacross service boundaries - Correlation IDs — link a response to a request in async flows
- Routing metadata — signal which region, tenant, or feature flag applies
- Schema type hints —
__TypeId__(set automatically byJsonSerializer) - Event versioning — indicate schema version without modifying the payload
flowchart LR
subgraph Record["Kafka Record"]
direction TB
Headers["Headers\nX-Trace-Id: abc123\nX-Correlation-Id: req-456\neventType: ORDER_PLACED"]
Key["Key: order-789"]
Value["Value: {orderId, customerId, ...}"]
end
Producer -->|"send"| Record --> Consumer
Writing Headers on the Producer
Using ProducerRecord Directly
@Service
@RequiredArgsConstructor
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
public void publish(OrderPlacedEvent event, String traceId, String correlationId) {
ProducerRecord<String, OrderPlacedEvent> record = new ProducerRecord<>(
"orders", event.getOrderId(), event
);
record.headers()
.add("X-Trace-Id", traceId.getBytes(StandardCharsets.UTF_8))
.add("X-Correlation-Id", correlationId.getBytes(StandardCharsets.UTF_8))
.add("eventType", "ORDER_PLACED".getBytes(StandardCharsets.UTF_8))
.add("schemaVersion", new byte[]{1});
kafkaTemplate.send(record);
}
}
Using MessageBuilder
Message<OrderPlacedEvent> message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.TOPIC, "orders")
.setHeader(KafkaHeaders.KEY, event.getOrderId())
.setHeader("X-Trace-Id", traceId)
.setHeader("X-Correlation-Id", correlationId)
.build();
kafkaTemplate.send(message);
MessageBuilder handles byte encoding of String headers automatically when using StringOrBytesSerializer mode.
Reading Headers on the Consumer
@Header Injection
Inject individual headers by name as method parameters:
@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrder(
OrderPlacedEvent event,
@Header("X-Trace-Id") String traceId,
@Header("X-Correlation-Id") String correlationId,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
MDC.put("traceId", traceId);
log.info("Processing order from partition={} offset={}", partition, offset);
inventoryService.reserveStock(event);
}
@Header decodes byte[] to String automatically for String-typed parameters. For byte[], declare it as byte[].
Accessing All Headers via ConsumerRecord
@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrder(ConsumerRecord<String, OrderPlacedEvent> record) {
Headers headers = record.headers();
String traceId = headerValue(headers, "X-Trace-Id");
String eventType = headerValue(headers, "eventType");
log.info("Event: type={} traceId={}", eventType, traceId);
}
private String headerValue(Headers headers, String key) {
Header header = headers.lastHeader(key);
return header != null ? new String(header.value(), StandardCharsets.UTF_8) : null;
}
headers.lastHeader(key) returns null if the header is absent — always null-check before reading .value().
KafkaHeaders Constants
Spring Kafka populates these headers automatically — read them without setting them yourself:
@KafkaListener(topics = "orders")
public void onOrder(
OrderPlacedEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
@Header(KafkaHeaders.GROUP_ID) String groupId) {
log.info("Received from topic={} partition={} offset={} ts={}",
topic, partition, offset, timestamp);
}
| Constant | Value | Description |
|---|---|---|
TOPIC | kafka_topic | Topic to send to (producer) |
KEY | kafka_messageKey | Record key (producer) |
RECEIVED_TOPIC | kafka_receivedTopic | Topic received from |
RECEIVED_PARTITION | kafka_receivedPartitionId | Partition index |
OFFSET | kafka_offset | Record offset |
RECEIVED_TIMESTAMP | kafka_receivedTimestamp | Record timestamp (epoch ms) |
GROUP_ID | kafka_groupId | Consumer group ID |
Propagating Trace Context Automatically
Use a ProducerInterceptor to inject trace headers on every outgoing record without modifying each send() call:
public class TracingProducerInterceptor implements ProducerInterceptor<String, Object> {
@Override
public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
String traceId = MDC.get("traceId");
if (traceId != null) {
record.headers().add("X-Trace-Id", traceId.getBytes(StandardCharsets.UTF_8));
}
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
Register in producer config:
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
TracingProducerInterceptor.class.getName());
On the consumer side, use a RecordInterceptor to extract and restore the trace context:
@Bean
public RecordInterceptor<String, Object> tracingConsumerInterceptor() {
return (record, consumer) -> {
Header traceHeader = record.headers().lastHeader("X-Trace-Id");
if (traceHeader != null) {
MDC.put("traceId", new String(traceHeader.value(), StandardCharsets.UTF_8));
}
return record;
};
}
Wire it into the factory:
factory.setRecordInterceptor(tracingConsumerInterceptor());
Now every listener has traceId in the MDC without any per-listener code.
Header-Based Routing
Route records to different handlers based on a header value using RecordFilterStrategy:
@Bean
public RecordFilterStrategy<String, OrderPlacedEvent> euRegionFilter() {
return record -> {
Header regionHeader = record.headers().lastHeader("X-Region");
return regionHeader == null ||
!"EU".equals(new String(regionHeader.value(), StandardCharsets.UTF_8));
};
}
Or inside the listener with a simple dispatch:
@KafkaListener(topics = "orders")
public void onOrder(
OrderPlacedEvent event,
@Header(value = "X-Region", required = false) String region) {
if ("EU".equals(region)) {
euOrderService.process(event);
} else {
globalOrderService.process(event);
}
}
required = false prevents a MessageConversionException when the header is absent.
Custom Header Encoding
Headers are raw byte[] — for non-string types, encode explicitly:
// Write a long as 8 bytes
record.headers().add("eventTimestamp",
ByteBuffer.allocate(8).putLong(event.getPlacedAt()).array());
// Read it back
Header tsHeader = record.headers().lastHeader("eventTimestamp");
long timestamp = ByteBuffer.wrap(tsHeader.value()).getLong();
Common Mistakes
Reading a missing header without null check — headers.lastHeader("X-Trace-Id").value() throws NullPointerException when the header is absent. Always null-check or use required = false on @Header.
Using @Header with byte[] but expecting String — @Header does not auto-decode when the parameter type is byte[]. Decode manually with new String(bytes, UTF_8).
Adding duplicate headers — Kafka allows multiple headers with the same key. headers.lastHeader(key) returns the last one; headers.headers(key) returns all. Use headers.remove(key) before headers.add(...) to replace instead of append.
Key Takeaways
- Kafka headers are
byte[]key-value pairs — always encode/decode explicitly with a charset - Use
@Header("name")for clean injection in@KafkaListenerparameters;String-typed parameters are decoded automatically KafkaHeadersconstants give access to partition, offset, topic, and timestamp without header parsing- Use
ProducerInterceptor+RecordInterceptorto propagate trace context automatically across all producers and consumers - Mark optional headers with
@Header(required = false)— missing required headers throwMessageConversionException headers.lastHeader(key)for single value;headers.headers(key)for multi-value;headers.remove(key)to replace
Next: Error Handling Basics: DefaultErrorHandler and CommonErrorHandler — configure retries, backoff, and recovery for listener exceptions using Spring Kafka’s error handler infrastructure.