Avro Serialization with Confluent Schema Registry

Why Avro and Schema Registry?

JSON has no schema enforcement — a producer can change a field name and silently break every consumer. Avro + Schema Registry solves this:

  • Avro gives you a compact binary format with a schema definition
  • Schema Registry stores and versions schemas, enforces compatibility rules, and prevents breaking changes from reaching consumers
flowchart LR
    subgraph Producer["Order Service"]
        E["OrderPlacedEvent"] -->|"KafkaAvroSerializer"| SR["Schema Registry\n(register/lookup schema)"]
        SR --> Bytes["[schema_id (4 bytes)] + [avro payload]"]
    end
    subgraph Broker["Kafka"]
        Bytes --> Topic["orders"]
    end
    subgraph Consumer["Inventory Service"]
        Topic -->|"KafkaAvroDeserializer"| SR2["Schema Registry\n(lookup by schema_id)"]
        SR2 --> Event["OrderPlacedEvent (Java)"]
    end

Every Avro message is prefixed with 4 bytes identifying its schema ID. The deserializer fetches the schema from the registry and uses it to decode the payload — no class name on the wire.


Docker Compose Setup

Add Schema Registry alongside Kafka:

version: '3.8'
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092

Maven Dependencies

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.11.3</version>
</dependency>

<!-- Avro code generation from .avsc files -->
<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.11.3</version>
    <executions>
        <execution>
            <goals><goal>schema</goal></goals>
            <configuration>
                <sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
                <outputDirectory>${project.basedir}/target/generated-sources/avro</outputDirectory>
            </configuration>
        </execution>
    </executions>
</plugin>

Confluent packages are in the Confluent Maven repository — add it to your pom:

<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>

Defining the Avro Schema

Create src/main/avro/OrderPlacedEvent.avsc:

{
  "type": "record",
  "name": "OrderPlacedEvent",
  "namespace": "com.example.events",
  "fields": [
    {"name": "orderId",    "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "totalAmount","type": "double"},
    {"name": "currency",   "type": "string", "default": "USD"},
    {"name": "placedAt",   "type": "long",   "logicalType": "timestamp-millis"}
  ]
}

Run mvn generate-sources — the plugin generates com.example.events.OrderPlacedEvent with builders, getters, and Avro schema metadata.


Producer Configuration

@Configuration
public class AvroProducerConfig {

    @Bean
    public ProducerFactory<String, OrderPlacedEvent> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        props.put("schema.registry.url", "http://localhost:8081");
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Sending an event — use the generated builder:

OrderPlacedEvent event = OrderPlacedEvent.newBuilder()
    .setOrderId(UUID.randomUUID().toString())
    .setCustomerId("customer-123")
    .setTotalAmount(149.99)
    .setCurrency("USD")
    .setPlacedAt(Instant.now().toEpochMilli())
    .build();

kafkaTemplate.send("orders", event.getOrderId(), event);

On first send, KafkaAvroSerializer registers the schema with Schema Registry and receives a schema ID. Subsequent sends use the cached ID — no round trip.


Consumer Configuration

@Configuration
public class AvroConsumerConfig {

    @Bean
    public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "inventory-service");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put("schema.registry.url", "http://localhost:8081");
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
            avroListenerContainerFactory() {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

SPECIFIC_AVRO_READER_CONFIG = true tells the deserializer to return the generated specific class (OrderPlacedEvent) rather than a generic GenericRecord.


Generic Record (Without Code Generation)

When you don’t control the schema or don’t want code generation:

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);

@KafkaListener(topics = "orders", containerFactory = "avroListenerContainerFactory")
public void onOrder(GenericRecord record) {
    String orderId = record.get("orderId").toString();
    double amount  = (double) record.get("totalAmount");
    log.info("Order {} for {}", orderId, amount);
}

GenericRecord works without generated classes but loses type safety.


Schema Evolution and Compatibility

Schema Registry enforces a compatibility mode per subject (topic). The default is BACKWARD — new schema must be able to read data written with the old schema.

flowchart TD
    subgraph Old["Schema v1"]
        F1["orderId: string"]
        F2["customerId: string"]
        F3["totalAmount: double"]
    end

    subgraph New["Schema v2"]
        G1["orderId: string"]
        G2["customerId: string"]
        G3["totalAmount: double"]
        G4["currency: string (default: 'USD')"]
        G5["shippingAddress: string (default: null)"]
    end

    Old -->|"BACKWARD compatible\n(new reader can read old messages\ndefaults fill missing fields)"| New

Compatibility rules:

ModeAdding fieldsRemoving fieldsRequired default
BACKWARD✓ (with default)Yes for new fields
FORWARD✓ (with default in old)No
FULL✓ (with default)✓ (with default in old)Yes
NONEAnything goesAnything goesNo

Safe schema changes:

  • Add an optional field with a default value ✓
  • Remove a field that had a default value ✓

Breaking schema changes:

  • Rename a field ✗
  • Change a field’s type ✗
  • Remove a required field ✗
  • Add a required field without a default ✗

Checking Compatibility Before Registering

# Test if a new schema is compatible before registering
curl -X POST \
  http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"OrderPlacedEvent\",...}"}'

# Response: {"is_compatible": true}

application.properties Alternative

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.properties.schema.registry.url=http://localhost:8081

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.schema.registry.url=http://localhost:8081
spring.kafka.consumer.properties.specific.avro.reader=true

Key Takeaways

  • Avro + Schema Registry gives you compact binary messages with enforced schema contracts — producers cannot silently break consumers
  • The schema ID is sent as the first 4 bytes of every message — the deserializer uses it to fetch the schema from the registry
  • Use SPECIFIC_AVRO_READER_CONFIG = true to get generated Java classes; use false for GenericRecord when you don’t control the schema
  • Schema Registry defaults to BACKWARD compatibility — always add new fields with defaults
  • Never rename or change field types — use a new field name and deprecate the old one
  • Check compatibility with the REST API before registering a new schema version in CI

Next: Custom Serializers and Deserializers — implement Serializer<T> and Deserializer<T> from scratch for Protobuf, MessagePack, or any custom binary format.