Handling Deserialization Errors Gracefully

The Problem: Poison Pills at Deserialization Time

A malformed byte sequence — truncated JSON, wrong Avro schema, corrupt payload — throws an exception during deserialization, before the listener method is called. Without special handling, this record blocks the partition indefinitely: the consumer fetches it, fails to deserialize, and fetches it again on the next poll.

sequenceDiagram
    participant Broker
    participant Container
    participant Deserializer

    loop forever without ErrorHandlingDeserializer
        Container->>Broker: poll()
        Broker-->>Container: [good-record, CORRUPT-RECORD, good-record]
        Container->>Deserializer: deserialize(CORRUPT-RECORD)
        Deserializer-->>Container: JsonProcessingException 💥
        Note over Container: Partition blocked — same record on every poll
    end

ErrorHandlingDeserializer solves this by catching the deserialization exception and wrapping it in a DeserializationException that the listener container can route to the error handler.


ErrorHandlingDeserializer

Wrap any deserializer — the underlying deserializer does the actual work; ErrorHandlingDeserializer catches any exception:

@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "inventory-service");

    // Wrap key deserializer
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
        StringDeserializer.class.getName());

    // Wrap value deserializer
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
        JsonDeserializer.class.getName());
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.events");

    return new DefaultKafkaConsumerFactory<>(props);
}

Or construct directly with instances:

@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
    JsonDeserializer<OrderPlacedEvent> jsonDeserializer =
        new JsonDeserializer<>(OrderPlacedEvent.class);
    jsonDeserializer.addTrustedPackages("*");

    return new DefaultKafkaConsumerFactory<>(
        Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"),
        new ErrorHandlingDeserializer<>(new StringDeserializer()),
        new ErrorHandlingDeserializer<>(jsonDeserializer)
    );
}

What Happens When Deserialization Fails

When ErrorHandlingDeserializer catches an exception, it:

  1. Sets the record’s value to null
  2. Adds a DeserializationException as a header on the record
  3. Returns the record to the container with a null value

The container detects the header, extracts the DeserializationException, and passes it to DefaultErrorHandler. Because DeserializationException is a non-retryable exception by default, the error handler immediately calls the recoverer (DLT publisher) — no retries.

flowchart LR
    Bytes["Corrupt bytes"] --> EHD["ErrorHandlingDeserializer"]
    EHD -->|"catch exception"| Null["record.value() = null\n+ DeserializationException header"]
    Null --> Container["Container detects header"]
    Container --> DEH["DefaultErrorHandler"]
    DEH -->|"non-retryable"| DLT["orders.DLT\n(raw bytes preserved)"]
    DLT --> DevOps["Developer inspects\noriginal bytes"]

Routing Deserialization Failures to DLT

DeadLetterPublishingRecoverer publishes the original bytes (not null) to the DLT — useful for inspecting the corrupt payload:

@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
    DeadLetterPublishingRecoverer recoverer =
        new DeadLetterPublishingRecoverer(kafkaTemplate);

    DefaultErrorHandler handler = new DefaultErrorHandler(
        recoverer,
        new ExponentialBackOff(1000L, 2.0)
    );

    // DeserializationException is already non-retryable by default
    // Explicitly mark it in case you're using allowlist mode:
    handler.addNotRetryableExceptions(DeserializationException.class);

    return handler;
}

Reading the DeserializationException Header

In the DLT consumer, extract the original bytes and exception details:

@KafkaListener(topics = "orders.DLT", groupId = "dlt-inspector")
public void onDltRecord(ConsumerRecord<String, Object> record) {
    // Check if this failed due to deserialization
    Header exHeader = record.headers()
        .lastHeader(DeserializationExceptionHeader.VALUE_DESERIALIZATION_EXCEPTION_HEADER);

    if (exHeader != null) {
        DeserializationException ex = (DeserializationException)
            new ObjectInputStream(new ByteArrayInputStream(exHeader.value())).readObject();

        log.error("Deserialization failed: topic={} partition={} offset={} error={}",
            ex.getRecord().topic(), ex.getRecord().partition(),
            ex.getRecord().offset(), ex.getMessage());

        // ex.getData() contains the original raw bytes
        log.debug("Raw bytes: {}", Arrays.toString(ex.getData()));
    }
}

Spring Kafka provides DeserializationExceptionHeader.VALUE_DESERIALIZATION_EXCEPTION_HEADER and KEY_DESERIALIZATION_EXCEPTION_HEADER constants for the header names.


application.properties Configuration

spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.events

Null Value Guard in Listener

When ErrorHandlingDeserializer returns null on failure, your listener receives null if the error handler does not intercept first. Add a guard in case your setup does not use a recoverer:

@KafkaListener(topics = "orders")
public void onOrder(OrderPlacedEvent event) {
    if (event == null) {
        log.warn("Received null event — likely a deserialization failure");
        return;
    }
    orderService.process(event);
}

With a properly configured DefaultErrorHandler + DeadLetterPublishingRecoverer, the null record never reaches the listener — it goes straight to the DLT.


End-to-End Flow

sequenceDiagram
    participant Broker
    participant EHD as "ErrorHandlingDeserializer"
    participant Container
    participant Handler as "DefaultErrorHandler"
    participant DLT as "orders.DLT"

    Broker-->>Container: CORRUPT_BYTES at offset 55
    Container->>EHD: deserialize(CORRUPT_BYTES)
    EHD-->>Container: null + DeserializationException header
    Container->>Handler: handleOne(null-record, DeserializationException)
    Note over Handler: non-retryable — skip backoff
    Handler->>DLT: publish(original bytes + error headers)
    Handler->>Container: commitOffset(56)
    Note over Container: Partition unblocked\nnext record at offset 56 processed

Key Takeaways

  • Without ErrorHandlingDeserializer, a single malformed record blocks the partition forever
  • Wrap any deserializer with ErrorHandlingDeserializer — it catches exceptions and carries them as a header on a null record
  • DeserializationException is non-retryable by default — the error handler calls the recoverer immediately
  • DeadLetterPublishingRecoverer preserves the original raw bytes in the DLT, enabling post-mortem analysis
  • Read the DeserializationExceptionHeader in your DLT consumer to extract the exact bytes and exception that caused the failure
  • Always pair ErrorHandlingDeserializer with a recoverer — null records silently consumed by the listener represent invisible data loss

Next: Non-Blocking Retries: @RetryableTopic, BackOff, and the Retry Topic Chain — retry failed records on separate retry topics without blocking the main partition.