Handling Deserialization Errors Gracefully
The Problem: Poison Pills at Deserialization Time
A malformed byte sequence — truncated JSON, wrong Avro schema, corrupt payload — throws an exception during deserialization, before the listener method is called. Without special handling, this record blocks the partition indefinitely: the consumer fetches it, fails to deserialize, and fetches it again on the next poll.
sequenceDiagram
participant Broker
participant Container
participant Deserializer
loop forever without ErrorHandlingDeserializer
Container->>Broker: poll()
Broker-->>Container: [good-record, CORRUPT-RECORD, good-record]
Container->>Deserializer: deserialize(CORRUPT-RECORD)
Deserializer-->>Container: JsonProcessingException 💥
Note over Container: Partition blocked — same record on every poll
end
ErrorHandlingDeserializer solves this by catching the deserialization exception and wrapping it in a DeserializationException that the listener container can route to the error handler.
ErrorHandlingDeserializer
Wrap any deserializer — the underlying deserializer does the actual work; ErrorHandlingDeserializer catches any exception:
@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "inventory-service");
// Wrap key deserializer
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
StringDeserializer.class.getName());
// Wrap value deserializer
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
JsonDeserializer.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.events");
return new DefaultKafkaConsumerFactory<>(props);
}
Or construct directly with instances:
@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
JsonDeserializer<OrderPlacedEvent> jsonDeserializer =
new JsonDeserializer<>(OrderPlacedEvent.class);
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"),
new ErrorHandlingDeserializer<>(new StringDeserializer()),
new ErrorHandlingDeserializer<>(jsonDeserializer)
);
}
What Happens When Deserialization Fails
When ErrorHandlingDeserializer catches an exception, it:
- Sets the record’s value to
null - Adds a
DeserializationExceptionas a header on the record - Returns the record to the container with a null value
The container detects the header, extracts the DeserializationException, and passes it to DefaultErrorHandler. Because DeserializationException is a non-retryable exception by default, the error handler immediately calls the recoverer (DLT publisher) — no retries.
flowchart LR
Bytes["Corrupt bytes"] --> EHD["ErrorHandlingDeserializer"]
EHD -->|"catch exception"| Null["record.value() = null\n+ DeserializationException header"]
Null --> Container["Container detects header"]
Container --> DEH["DefaultErrorHandler"]
DEH -->|"non-retryable"| DLT["orders.DLT\n(raw bytes preserved)"]
DLT --> DevOps["Developer inspects\noriginal bytes"]
Routing Deserialization Failures to DLT
DeadLetterPublishingRecoverer publishes the original bytes (not null) to the DLT — useful for inspecting the corrupt payload:
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate);
DefaultErrorHandler handler = new DefaultErrorHandler(
recoverer,
new ExponentialBackOff(1000L, 2.0)
);
// DeserializationException is already non-retryable by default
// Explicitly mark it in case you're using allowlist mode:
handler.addNotRetryableExceptions(DeserializationException.class);
return handler;
}
Reading the DeserializationException Header
In the DLT consumer, extract the original bytes and exception details:
@KafkaListener(topics = "orders.DLT", groupId = "dlt-inspector")
public void onDltRecord(ConsumerRecord<String, Object> record) {
// Check if this failed due to deserialization
Header exHeader = record.headers()
.lastHeader(DeserializationExceptionHeader.VALUE_DESERIALIZATION_EXCEPTION_HEADER);
if (exHeader != null) {
DeserializationException ex = (DeserializationException)
new ObjectInputStream(new ByteArrayInputStream(exHeader.value())).readObject();
log.error("Deserialization failed: topic={} partition={} offset={} error={}",
ex.getRecord().topic(), ex.getRecord().partition(),
ex.getRecord().offset(), ex.getMessage());
// ex.getData() contains the original raw bytes
log.debug("Raw bytes: {}", Arrays.toString(ex.getData()));
}
}
Spring Kafka provides DeserializationExceptionHeader.VALUE_DESERIALIZATION_EXCEPTION_HEADER and KEY_DESERIALIZATION_EXCEPTION_HEADER constants for the header names.
application.properties Configuration
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.events
Null Value Guard in Listener
When ErrorHandlingDeserializer returns null on failure, your listener receives null if the error handler does not intercept first. Add a guard in case your setup does not use a recoverer:
@KafkaListener(topics = "orders")
public void onOrder(OrderPlacedEvent event) {
if (event == null) {
log.warn("Received null event — likely a deserialization failure");
return;
}
orderService.process(event);
}
With a properly configured DefaultErrorHandler + DeadLetterPublishingRecoverer, the null record never reaches the listener — it goes straight to the DLT.
End-to-End Flow
sequenceDiagram
participant Broker
participant EHD as "ErrorHandlingDeserializer"
participant Container
participant Handler as "DefaultErrorHandler"
participant DLT as "orders.DLT"
Broker-->>Container: CORRUPT_BYTES at offset 55
Container->>EHD: deserialize(CORRUPT_BYTES)
EHD-->>Container: null + DeserializationException header
Container->>Handler: handleOne(null-record, DeserializationException)
Note over Handler: non-retryable — skip backoff
Handler->>DLT: publish(original bytes + error headers)
Handler->>Container: commitOffset(56)
Note over Container: Partition unblocked\nnext record at offset 56 processed
Key Takeaways
- Without
ErrorHandlingDeserializer, a single malformed record blocks the partition forever - Wrap any deserializer with
ErrorHandlingDeserializer— it catches exceptions and carries them as a header on a null record DeserializationExceptionis non-retryable by default — the error handler calls the recoverer immediatelyDeadLetterPublishingRecovererpreserves the original raw bytes in the DLT, enabling post-mortem analysis- Read the
DeserializationExceptionHeaderin your DLT consumer to extract the exact bytes and exception that caused the failure - Always pair
ErrorHandlingDeserializerwith a recoverer — null records silently consumed by the listener represent invisible data loss
Next: Non-Blocking Retries: @RetryableTopic, BackOff, and the Retry Topic Chain — retry failed records on separate retry topics without blocking the main partition.