Message Headers: Metadata, Routing, and Custom Header Propagation

What Are Kafka Record Headers?

Every Kafka record carries a list of Header objects — key-value pairs of String key and byte[] value. They sit outside the message payload and are ideal for:

  • Trace propagation — carry X-Trace-Id / X-Span-Id across service boundaries
  • Correlation IDs — link a response to a request in async flows
  • Routing metadata — signal which region, tenant, or feature flag applies
  • Schema type hints__TypeId__ (set automatically by JsonSerializer)
  • Event versioning — indicate schema version without modifying the payload
flowchart LR
    subgraph Record["Kafka Record"]
        direction TB
        Headers["Headers\nX-Trace-Id: abc123\nX-Correlation-Id: req-456\neventType: ORDER_PLACED"]
        Key["Key: order-789"]
        Value["Value: {orderId, customerId, ...}"]
    end
    Producer -->|"send"| Record --> Consumer

Writing Headers on the Producer

Using ProducerRecord Directly

@Service
@RequiredArgsConstructor
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

    public void publish(OrderPlacedEvent event, String traceId, String correlationId) {
        ProducerRecord<String, OrderPlacedEvent> record = new ProducerRecord<>(
            "orders", event.getOrderId(), event
        );

        record.headers()
            .add("X-Trace-Id",       traceId.getBytes(StandardCharsets.UTF_8))
            .add("X-Correlation-Id", correlationId.getBytes(StandardCharsets.UTF_8))
            .add("eventType",        "ORDER_PLACED".getBytes(StandardCharsets.UTF_8))
            .add("schemaVersion",    new byte[]{1});

        kafkaTemplate.send(record);
    }
}

Using MessageBuilder

Message<OrderPlacedEvent> message = MessageBuilder
    .withPayload(event)
    .setHeader(KafkaHeaders.TOPIC, "orders")
    .setHeader(KafkaHeaders.KEY, event.getOrderId())
    .setHeader("X-Trace-Id", traceId)
    .setHeader("X-Correlation-Id", correlationId)
    .build();

kafkaTemplate.send(message);

MessageBuilder handles byte encoding of String headers automatically when using StringOrBytesSerializer mode.


Reading Headers on the Consumer

@Header Injection

Inject individual headers by name as method parameters:

@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrder(
        OrderPlacedEvent event,
        @Header("X-Trace-Id") String traceId,
        @Header("X-Correlation-Id") String correlationId,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.OFFSET) long offset) {

    MDC.put("traceId", traceId);
    log.info("Processing order from partition={} offset={}", partition, offset);
    inventoryService.reserveStock(event);
}

@Header decodes byte[] to String automatically for String-typed parameters. For byte[], declare it as byte[].

Accessing All Headers via ConsumerRecord

@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrder(ConsumerRecord<String, OrderPlacedEvent> record) {
    Headers headers = record.headers();

    String traceId = headerValue(headers, "X-Trace-Id");
    String eventType = headerValue(headers, "eventType");

    log.info("Event: type={} traceId={}", eventType, traceId);
}

private String headerValue(Headers headers, String key) {
    Header header = headers.lastHeader(key);
    return header != null ? new String(header.value(), StandardCharsets.UTF_8) : null;
}

headers.lastHeader(key) returns null if the header is absent — always null-check before reading .value().


KafkaHeaders Constants

Spring Kafka populates these headers automatically — read them without setting them yourself:

@KafkaListener(topics = "orders")
public void onOrder(
        OrderPlacedEvent event,
        @Header(KafkaHeaders.RECEIVED_TOPIC)     String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.OFFSET)             long offset,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
        @Header(KafkaHeaders.GROUP_ID)           String groupId) {

    log.info("Received from topic={} partition={} offset={} ts={}",
        topic, partition, offset, timestamp);
}
ConstantValueDescription
TOPICkafka_topicTopic to send to (producer)
KEYkafka_messageKeyRecord key (producer)
RECEIVED_TOPICkafka_receivedTopicTopic received from
RECEIVED_PARTITIONkafka_receivedPartitionIdPartition index
OFFSETkafka_offsetRecord offset
RECEIVED_TIMESTAMPkafka_receivedTimestampRecord timestamp (epoch ms)
GROUP_IDkafka_groupIdConsumer group ID

Propagating Trace Context Automatically

Use a ProducerInterceptor to inject trace headers on every outgoing record without modifying each send() call:

public class TracingProducerInterceptor implements ProducerInterceptor<String, Object> {

    @Override
    public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
        String traceId = MDC.get("traceId");
        if (traceId != null) {
            record.headers().add("X-Trace-Id", traceId.getBytes(StandardCharsets.UTF_8));
        }
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

Register in producer config:

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
    TracingProducerInterceptor.class.getName());

On the consumer side, use a RecordInterceptor to extract and restore the trace context:

@Bean
public RecordInterceptor<String, Object> tracingConsumerInterceptor() {
    return (record, consumer) -> {
        Header traceHeader = record.headers().lastHeader("X-Trace-Id");
        if (traceHeader != null) {
            MDC.put("traceId", new String(traceHeader.value(), StandardCharsets.UTF_8));
        }
        return record;
    };
}

Wire it into the factory:

factory.setRecordInterceptor(tracingConsumerInterceptor());

Now every listener has traceId in the MDC without any per-listener code.


Header-Based Routing

Route records to different handlers based on a header value using RecordFilterStrategy:

@Bean
public RecordFilterStrategy<String, OrderPlacedEvent> euRegionFilter() {
    return record -> {
        Header regionHeader = record.headers().lastHeader("X-Region");
        return regionHeader == null ||
               !"EU".equals(new String(regionHeader.value(), StandardCharsets.UTF_8));
    };
}

Or inside the listener with a simple dispatch:

@KafkaListener(topics = "orders")
public void onOrder(
        OrderPlacedEvent event,
        @Header(value = "X-Region", required = false) String region) {

    if ("EU".equals(region)) {
        euOrderService.process(event);
    } else {
        globalOrderService.process(event);
    }
}

required = false prevents a MessageConversionException when the header is absent.


Custom Header Encoding

Headers are raw byte[] — for non-string types, encode explicitly:

// Write a long as 8 bytes
record.headers().add("eventTimestamp",
    ByteBuffer.allocate(8).putLong(event.getPlacedAt()).array());

// Read it back
Header tsHeader = record.headers().lastHeader("eventTimestamp");
long timestamp = ByteBuffer.wrap(tsHeader.value()).getLong();

Common Mistakes

Reading a missing header without null checkheaders.lastHeader("X-Trace-Id").value() throws NullPointerException when the header is absent. Always null-check or use required = false on @Header.

Using @Header with byte[] but expecting String@Header does not auto-decode when the parameter type is byte[]. Decode manually with new String(bytes, UTF_8).

Adding duplicate headers — Kafka allows multiple headers with the same key. headers.lastHeader(key) returns the last one; headers.headers(key) returns all. Use headers.remove(key) before headers.add(...) to replace instead of append.


Key Takeaways

  • Kafka headers are byte[] key-value pairs — always encode/decode explicitly with a charset
  • Use @Header("name") for clean injection in @KafkaListener parameters; String-typed parameters are decoded automatically
  • KafkaHeaders constants give access to partition, offset, topic, and timestamp without header parsing
  • Use ProducerInterceptor + RecordInterceptor to propagate trace context automatically across all producers and consumers
  • Mark optional headers with @Header(required = false) — missing required headers throw MessageConversionException
  • headers.lastHeader(key) for single value; headers.headers(key) for multi-value; headers.remove(key) to replace

Next: Error Handling Basics: DefaultErrorHandler and CommonErrorHandler — configure retries, backoff, and recovery for listener exceptions using Spring Kafka’s error handler infrastructure.