Custom Serializers and Deserializers

When to Write a Custom Serializer

Spring Kafka ships JSON and Avro support. You need a custom serializer when:

  • Your team uses Protobuf or MessagePack and wants native support
  • You need a compact binary format for high-throughput topics (pricing ticks, sensor readings)
  • You’re integrating with a legacy system that publishes a fixed binary protocol
  • You want deterministic serialization for event deduplication or content-addressed storage

The Serializer and Deserializer Interfaces

// org.apache.kafka.common.serialization
public interface Serializer<T> extends Closeable {
    byte[] serialize(String topic, T data);
    default void configure(Map<String, ?> configs, boolean isKey) {}
    default void close() {}
}

public interface Deserializer<T> extends Closeable {
    T deserialize(String topic, byte[] data);
    default void configure(Map<String, ?> configs, boolean isKey) {}
    default void close() {}
}

Both interfaces are simple — implement serialize / deserialize, optionally configure for runtime config, and close for cleanup.


Example: Custom Binary OrderPlacedEvent Serializer

The goal: serialize OrderPlacedEvent to a compact binary format without JSON overhead.

flowchart LR
    subgraph Event["OrderPlacedEvent"]
        F1["orderId (UUID → 16 bytes)"]
        F2["customerId (length-prefixed UTF-8)"]
        F3["totalAmount (double → 8 bytes)"]
        F4["placedAt (epoch millis → 8 bytes)"]
    end
    Event -->|"serialize"| Bytes["[16][len][str bytes][8][8]"]
    Bytes -->|"deserialize"| Event2["OrderPlacedEvent"]

Serializer

public class OrderPlacedEventSerializer implements Serializer<OrderPlacedEvent> {

    @Override
    public byte[] serialize(String topic, OrderPlacedEvent event) {
        if (event == null) return null;

        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream out = new DataOutputStream(baos);

            // orderId as UUID bytes (fixed 16 bytes)
            UUID id = UUID.fromString(event.getOrderId());
            out.writeLong(id.getMostSignificantBits());
            out.writeLong(id.getLeastSignificantBits());

            // customerId as length-prefixed UTF-8
            byte[] customerBytes = event.getCustomerId().getBytes(StandardCharsets.UTF_8);
            out.writeInt(customerBytes.length);
            out.write(customerBytes);

            // totalAmount and placedAt as fixed-width primitives
            out.writeDouble(event.getTotalAmount());
            out.writeLong(event.getPlacedAt());

            return baos.toByteArray();
        } catch (IOException e) {
            throw new SerializationException("Failed to serialize OrderPlacedEvent", e);
        }
    }
}

Deserializer

public class OrderPlacedEventDeserializer implements Deserializer<OrderPlacedEvent> {

    @Override
    public OrderPlacedEvent deserialize(String topic, byte[] data) {
        if (data == null) return null;

        try {
            DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));

            long mostSig  = in.readLong();
            long leastSig = in.readLong();
            String orderId = new UUID(mostSig, leastSig).toString();

            int len = in.readInt();
            byte[] customerBytes = new byte[len];
            in.readFully(customerBytes);
            String customerId = new String(customerBytes, StandardCharsets.UTF_8);

            double totalAmount = in.readDouble();
            long placedAt = in.readLong();

            return new OrderPlacedEvent(orderId, customerId, totalAmount, placedAt);
        } catch (IOException e) {
            throw new SerializationException("Failed to deserialize OrderPlacedEvent", e);
        }
    }
}

Wiring into ProducerFactory and ConsumerFactory

@Configuration
public class CustomSerializerConfig {

    @Bean
    public ProducerFactory<String, OrderPlacedEvent> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            OrderPlacedEventSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            OrderPlacedEventDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

Alternatively, pass instances directly to avoid reflection:

@Bean
public ProducerFactory<String, OrderPlacedEvent> producerFactory() {
    return new DefaultKafkaProducerFactory<>(
        Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"),
        new StringSerializer(),
        new OrderPlacedEventSerializer()
    );
}

@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(
        Map.of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
            ConsumerConfig.GROUP_ID_CONFIG, "order-service"
        ),
        new StringDeserializer(),
        new OrderPlacedEventDeserializer()
    );
}

Passing instances is cleaner when you need to inject dependencies into the serializer.


Injecting Dependencies into a Deserializer

If your deserializer needs Spring beans (e.g. a schema version registry), construct it as a Spring bean:

@Component
public class VersionedOrderDeserializer implements Deserializer<OrderPlacedEvent> {

    private final SchemaVersionRegistry schemaRegistry;

    public VersionedOrderDeserializer(SchemaVersionRegistry schemaRegistry) {
        this.schemaRegistry = schemaRegistry;
    }

    @Override
    public OrderPlacedEvent deserialize(String topic, byte[] data) {
        int version = data[0] & 0xFF;  // first byte is schema version
        return schemaRegistry.deserialize(version, data, 1);
    }
}

Wire it into the factory using @Autowired:

@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory(
        VersionedOrderDeserializer deserializer) {
    return new DefaultKafkaConsumerFactory<>(baseProps(), new StringDeserializer(), deserializer);
}

Adding a Version Byte

For forward compatibility, prefix serialized bytes with a version number:

@Override
public byte[] serialize(String topic, OrderPlacedEvent event) {
    if (event == null) return null;
    byte[] payload = serializeV1(event);
    byte[] result = new byte[payload.length + 1];
    result[0] = 1;  // schema version = 1
    System.arraycopy(payload, 0, result, 1, payload.length);
    return result;
}

@Override
public OrderPlacedEvent deserialize(String topic, byte[] data) {
    if (data == null) return null;
    int version = data[0] & 0xFF;
    return switch (version) {
        case 1 -> deserializeV1(data, 1);
        case 2 -> deserializeV2(data, 1);
        default -> throw new SerializationException("Unknown schema version: " + version);
    };
}

Protobuf Example

For teams already using Protobuf:

public class OrderEventSerializer implements Serializer<OrderEventProto.OrderPlacedEvent> {
    @Override
    public byte[] serialize(String topic, OrderEventProto.OrderPlacedEvent event) {
        if (event == null) return null;
        return event.toByteArray();
    }
}

public class OrderEventDeserializer implements Deserializer<OrderEventProto.OrderPlacedEvent> {
    @Override
    public OrderEventProto.OrderPlacedEvent deserialize(String topic, byte[] data) {
        if (data == null) return null;
        try {
            return OrderEventProto.OrderPlacedEvent.parseFrom(data);
        } catch (InvalidProtocolBufferException e) {
            throw new SerializationException("Failed to deserialize Protobuf", e);
        }
    }
}

Testing Custom Serializers

Unit test the round-trip without Kafka:

@Test
void roundTrip() {
    var serializer = new OrderPlacedEventSerializer();
    var deserializer = new OrderPlacedEventDeserializer();

    OrderPlacedEvent original = new OrderPlacedEvent(
        UUID.randomUUID().toString(), "customer-42", 149.99, Instant.now().toEpochMilli()
    );

    byte[] bytes = serializer.serialize("orders", original);
    OrderPlacedEvent restored = deserializer.deserialize("orders", bytes);

    assertThat(restored.getOrderId()).isEqualTo(original.getOrderId());
    assertThat(restored.getCustomerId()).isEqualTo(original.getCustomerId());
    assertThat(restored.getTotalAmount()).isEqualTo(original.getTotalAmount());
    assertThat(restored.getPlacedAt()).isEqualTo(original.getPlacedAt());
}

@Test
void nullHandling() {
    assertThat(new OrderPlacedEventSerializer().serialize("orders", null)).isNull();
    assertThat(new OrderPlacedEventDeserializer().deserialize("orders", null)).isNull();
}

Key Takeaways

  • Implement Serializer<T> and Deserializer<T> — just two methods each, both receiving raw byte[]
  • Always handle null — Kafka may call serialize(topic, null) for tombstone records
  • Throw SerializationException on error — Kafka’s error handling infrastructure catches it
  • Pass serializer/deserializer instances directly to DefaultKafkaProducerFactory / DefaultKafkaConsumerFactory when you need Spring dependency injection
  • Add a version byte as the first byte — it costs 1 byte and gives you schema evolution for free
  • Unit-test the round-trip before integration testing against a real broker

Next: Message Headers: Metadata, Routing, and Custom Header Propagation — read and write Kafka record headers to propagate trace IDs, correlation IDs, and routing metadata across services.