Custom Serializers and Deserializers
When to Write a Custom Serializer
Spring Kafka ships JSON and Avro support. You need a custom serializer when:
- Your team uses Protobuf or MessagePack and wants native support
- You need a compact binary format for high-throughput topics (pricing ticks, sensor readings)
- You’re integrating with a legacy system that publishes a fixed binary protocol
- You want deterministic serialization for event deduplication or content-addressed storage
The Serializer and Deserializer Interfaces
// org.apache.kafka.common.serialization
public interface Serializer<T> extends Closeable {
byte[] serialize(String topic, T data);
default void configure(Map<String, ?> configs, boolean isKey) {}
default void close() {}
}
public interface Deserializer<T> extends Closeable {
T deserialize(String topic, byte[] data);
default void configure(Map<String, ?> configs, boolean isKey) {}
default void close() {}
}
Both interfaces are simple — implement serialize / deserialize, optionally configure for runtime config, and close for cleanup.
Example: Custom Binary OrderPlacedEvent Serializer
The goal: serialize OrderPlacedEvent to a compact binary format without JSON overhead.
flowchart LR
subgraph Event["OrderPlacedEvent"]
F1["orderId (UUID → 16 bytes)"]
F2["customerId (length-prefixed UTF-8)"]
F3["totalAmount (double → 8 bytes)"]
F4["placedAt (epoch millis → 8 bytes)"]
end
Event -->|"serialize"| Bytes["[16][len][str bytes][8][8]"]
Bytes -->|"deserialize"| Event2["OrderPlacedEvent"]
Serializer
public class OrderPlacedEventSerializer implements Serializer<OrderPlacedEvent> {
@Override
public byte[] serialize(String topic, OrderPlacedEvent event) {
if (event == null) return null;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
// orderId as UUID bytes (fixed 16 bytes)
UUID id = UUID.fromString(event.getOrderId());
out.writeLong(id.getMostSignificantBits());
out.writeLong(id.getLeastSignificantBits());
// customerId as length-prefixed UTF-8
byte[] customerBytes = event.getCustomerId().getBytes(StandardCharsets.UTF_8);
out.writeInt(customerBytes.length);
out.write(customerBytes);
// totalAmount and placedAt as fixed-width primitives
out.writeDouble(event.getTotalAmount());
out.writeLong(event.getPlacedAt());
return baos.toByteArray();
} catch (IOException e) {
throw new SerializationException("Failed to serialize OrderPlacedEvent", e);
}
}
}
Deserializer
public class OrderPlacedEventDeserializer implements Deserializer<OrderPlacedEvent> {
@Override
public OrderPlacedEvent deserialize(String topic, byte[] data) {
if (data == null) return null;
try {
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
long mostSig = in.readLong();
long leastSig = in.readLong();
String orderId = new UUID(mostSig, leastSig).toString();
int len = in.readInt();
byte[] customerBytes = new byte[len];
in.readFully(customerBytes);
String customerId = new String(customerBytes, StandardCharsets.UTF_8);
double totalAmount = in.readDouble();
long placedAt = in.readLong();
return new OrderPlacedEvent(orderId, customerId, totalAmount, placedAt);
} catch (IOException e) {
throw new SerializationException("Failed to deserialize OrderPlacedEvent", e);
}
}
}
Wiring into ProducerFactory and ConsumerFactory
@Configuration
public class CustomSerializerConfig {
@Bean
public ProducerFactory<String, OrderPlacedEvent> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
OrderPlacedEventSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
OrderPlacedEventDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}
Alternatively, pass instances directly to avoid reflection:
@Bean
public ProducerFactory<String, OrderPlacedEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"),
new StringSerializer(),
new OrderPlacedEventSerializer()
);
}
@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "order-service"
),
new StringDeserializer(),
new OrderPlacedEventDeserializer()
);
}
Passing instances is cleaner when you need to inject dependencies into the serializer.
Injecting Dependencies into a Deserializer
If your deserializer needs Spring beans (e.g. a schema version registry), construct it as a Spring bean:
@Component
public class VersionedOrderDeserializer implements Deserializer<OrderPlacedEvent> {
private final SchemaVersionRegistry schemaRegistry;
public VersionedOrderDeserializer(SchemaVersionRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}
@Override
public OrderPlacedEvent deserialize(String topic, byte[] data) {
int version = data[0] & 0xFF; // first byte is schema version
return schemaRegistry.deserialize(version, data, 1);
}
}
Wire it into the factory using @Autowired:
@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory(
VersionedOrderDeserializer deserializer) {
return new DefaultKafkaConsumerFactory<>(baseProps(), new StringDeserializer(), deserializer);
}
Adding a Version Byte
For forward compatibility, prefix serialized bytes with a version number:
@Override
public byte[] serialize(String topic, OrderPlacedEvent event) {
if (event == null) return null;
byte[] payload = serializeV1(event);
byte[] result = new byte[payload.length + 1];
result[0] = 1; // schema version = 1
System.arraycopy(payload, 0, result, 1, payload.length);
return result;
}
@Override
public OrderPlacedEvent deserialize(String topic, byte[] data) {
if (data == null) return null;
int version = data[0] & 0xFF;
return switch (version) {
case 1 -> deserializeV1(data, 1);
case 2 -> deserializeV2(data, 1);
default -> throw new SerializationException("Unknown schema version: " + version);
};
}
Protobuf Example
For teams already using Protobuf:
public class OrderEventSerializer implements Serializer<OrderEventProto.OrderPlacedEvent> {
@Override
public byte[] serialize(String topic, OrderEventProto.OrderPlacedEvent event) {
if (event == null) return null;
return event.toByteArray();
}
}
public class OrderEventDeserializer implements Deserializer<OrderEventProto.OrderPlacedEvent> {
@Override
public OrderEventProto.OrderPlacedEvent deserialize(String topic, byte[] data) {
if (data == null) return null;
try {
return OrderEventProto.OrderPlacedEvent.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Failed to deserialize Protobuf", e);
}
}
}
Testing Custom Serializers
Unit test the round-trip without Kafka:
@Test
void roundTrip() {
var serializer = new OrderPlacedEventSerializer();
var deserializer = new OrderPlacedEventDeserializer();
OrderPlacedEvent original = new OrderPlacedEvent(
UUID.randomUUID().toString(), "customer-42", 149.99, Instant.now().toEpochMilli()
);
byte[] bytes = serializer.serialize("orders", original);
OrderPlacedEvent restored = deserializer.deserialize("orders", bytes);
assertThat(restored.getOrderId()).isEqualTo(original.getOrderId());
assertThat(restored.getCustomerId()).isEqualTo(original.getCustomerId());
assertThat(restored.getTotalAmount()).isEqualTo(original.getTotalAmount());
assertThat(restored.getPlacedAt()).isEqualTo(original.getPlacedAt());
}
@Test
void nullHandling() {
assertThat(new OrderPlacedEventSerializer().serialize("orders", null)).isNull();
assertThat(new OrderPlacedEventDeserializer().deserialize("orders", null)).isNull();
}
Key Takeaways
- Implement
Serializer<T>andDeserializer<T>— just two methods each, both receiving rawbyte[] - Always handle
null— Kafka may callserialize(topic, null)for tombstone records - Throw
SerializationExceptionon error — Kafka’s error handling infrastructure catches it - Pass serializer/deserializer instances directly to
DefaultKafkaProducerFactory/DefaultKafkaConsumerFactorywhen you need Spring dependency injection - Add a version byte as the first byte — it costs 1 byte and gives you schema evolution for free
- Unit-test the round-trip before integration testing against a real broker
Next: Message Headers: Metadata, Routing, and Custom Header Propagation — read and write Kafka record headers to propagate trace IDs, correlation IDs, and routing metadata across services.