Avro Serialization with Confluent Schema Registry
Why Avro and Schema Registry?
JSON has no schema enforcement — a producer can change a field name and silently break every consumer. Avro + Schema Registry solves this:
- Avro gives you a compact binary format with a schema definition
- Schema Registry stores and versions schemas, enforces compatibility rules, and prevents breaking changes from reaching consumers
flowchart LR
subgraph Producer["Order Service"]
E["OrderPlacedEvent"] -->|"KafkaAvroSerializer"| SR["Schema Registry\n(register/lookup schema)"]
SR --> Bytes["[schema_id (4 bytes)] + [avro payload]"]
end
subgraph Broker["Kafka"]
Bytes --> Topic["orders"]
end
subgraph Consumer["Inventory Service"]
Topic -->|"KafkaAvroDeserializer"| SR2["Schema Registry\n(lookup by schema_id)"]
SR2 --> Event["OrderPlacedEvent (Java)"]
end
Every Avro message is prefixed with 4 bytes identifying its schema ID. The deserializer fetches the schema from the registry and uses it to decode the payload — no class name on the wire.
Docker Compose Setup
Add Schema Registry alongside Kafka:
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
Maven Dependencies
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- Avro code generation from .avsc files -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<goals><goal>schema</goal></goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
<outputDirectory>${project.basedir}/target/generated-sources/avro</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
Confluent packages are in the Confluent Maven repository — add it to your pom:
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
Defining the Avro Schema
Create src/main/avro/OrderPlacedEvent.avsc:
{
"type": "record",
"name": "OrderPlacedEvent",
"namespace": "com.example.events",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "totalAmount","type": "double"},
{"name": "currency", "type": "string", "default": "USD"},
{"name": "placedAt", "type": "long", "logicalType": "timestamp-millis"}
]
}
Run mvn generate-sources — the plugin generates com.example.events.OrderPlacedEvent with builders, getters, and Avro schema metadata.
Producer Configuration
@Configuration
public class AvroProducerConfig {
@Bean
public ProducerFactory<String, OrderPlacedEvent> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Sending an event — use the generated builder:
OrderPlacedEvent event = OrderPlacedEvent.newBuilder()
.setOrderId(UUID.randomUUID().toString())
.setCustomerId("customer-123")
.setTotalAmount(149.99)
.setCurrency("USD")
.setPlacedAt(Instant.now().toEpochMilli())
.build();
kafkaTemplate.send("orders", event.getOrderId(), event);
On first send, KafkaAvroSerializer registers the schema with Schema Registry and receives a schema ID. Subsequent sends use the cached ID — no round trip.
Consumer Configuration
@Configuration
public class AvroConsumerConfig {
@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "inventory-service");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put("schema.registry.url", "http://localhost:8081");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
avroListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
SPECIFIC_AVRO_READER_CONFIG = true tells the deserializer to return the generated specific class (OrderPlacedEvent) rather than a generic GenericRecord.
Generic Record (Without Code Generation)
When you don’t control the schema or don’t want code generation:
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
@KafkaListener(topics = "orders", containerFactory = "avroListenerContainerFactory")
public void onOrder(GenericRecord record) {
String orderId = record.get("orderId").toString();
double amount = (double) record.get("totalAmount");
log.info("Order {} for {}", orderId, amount);
}
GenericRecord works without generated classes but loses type safety.
Schema Evolution and Compatibility
Schema Registry enforces a compatibility mode per subject (topic). The default is BACKWARD — new schema must be able to read data written with the old schema.
flowchart TD
subgraph Old["Schema v1"]
F1["orderId: string"]
F2["customerId: string"]
F3["totalAmount: double"]
end
subgraph New["Schema v2"]
G1["orderId: string"]
G2["customerId: string"]
G3["totalAmount: double"]
G4["currency: string (default: 'USD')"]
G5["shippingAddress: string (default: null)"]
end
Old -->|"BACKWARD compatible\n(new reader can read old messages\ndefaults fill missing fields)"| New
Compatibility rules:
| Mode | Adding fields | Removing fields | Required default |
|---|---|---|---|
| BACKWARD | ✓ (with default) | ✓ | Yes for new fields |
| FORWARD | ✓ | ✓ (with default in old) | No |
| FULL | ✓ (with default) | ✓ (with default in old) | Yes |
| NONE | Anything goes | Anything goes | No |
Safe schema changes:
- Add an optional field with a default value ✓
- Remove a field that had a default value ✓
Breaking schema changes:
- Rename a field ✗
- Change a field’s type ✗
- Remove a required field ✗
- Add a required field without a default ✗
Checking Compatibility Before Registering
# Test if a new schema is compatible before registering
curl -X POST \
http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\":\"record\",\"name\":\"OrderPlacedEvent\",...}"}'
# Response: {"is_compatible": true}
application.properties Alternative
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.properties.schema.registry.url=http://localhost:8081
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.schema.registry.url=http://localhost:8081
spring.kafka.consumer.properties.specific.avro.reader=true
Key Takeaways
- Avro + Schema Registry gives you compact binary messages with enforced schema contracts — producers cannot silently break consumers
- The schema ID is sent as the first 4 bytes of every message — the deserializer uses it to fetch the schema from the registry
- Use
SPECIFIC_AVRO_READER_CONFIG = trueto get generated Java classes; usefalseforGenericRecordwhen you don’t control the schema - Schema Registry defaults to BACKWARD compatibility — always add new fields with defaults
- Never rename or change field types — use a new field name and deprecate the old one
- Check compatibility with the REST API before registering a new schema version in CI
Next: Custom Serializers and Deserializers — implement Serializer<T> and Deserializer<T> from scratch for Protobuf, MessagePack, or any custom binary format.