Part 9 of 37
Producer @Bean Configuration: Beyond application.properties
Why @Bean Configuration?
application.properties is convenient for a single producer, but insufficient when you need:
- Multiple producers with different serializers (e.g. one for JSON events, one for Avro)
- Different settings per environment built at runtime (not just property substitution)
- Producers sending to different clusters (e.g. primary + DR cluster)
- Programmatic validation of configuration at startup
flowchart TB
subgraph PropertiesApproach["application.properties Approach"]
P1["Single producer config\nspring.kafka.producer.*\n✓ Simple\n✗ One producer only\n✗ No runtime logic"]
end
subgraph BeanApproach["@Bean Approach"]
B1["ProducerFactory\n(creates KafkaProducer instances)"]
B2["KafkaTemplate\n(wraps ProducerFactory)"]
B1 --> B2
Note["✓ Multiple factories/templates\n✓ Runtime configuration\n✓ Different serializers per template\n✓ Custom ProducerFactory subclasses"]
end
The ProducerFactory
ProducerFactory<K, V> is responsible for creating KafkaProducer instances. DefaultKafkaProducerFactory is the standard implementation. KafkaTemplate delegates all producer operations to its ProducerFactory.
classDiagram
class ProducerFactory~K,V~ {
<>
+createProducer() KafkaProducer
+getConfigurationProperties() Map
}
class DefaultKafkaProducerFactory~K,V~ {
-Map configs
-Serializer keySerializer
-Serializer valueSerializer
+DefaultKafkaProducerFactory(Map configs)
+createProducer() KafkaProducer
}
class KafkaTemplate~K,V~ {
-ProducerFactory factory
+send(topic, key, value) CompletableFuture
+sendDefault(key, value) CompletableFuture
+flush()
}
ProducerFactory <|.. DefaultKafkaProducerFactory
KafkaTemplate --> ProducerFactory
Basic @Bean Producer Configuration
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
// Connection
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Serialization
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Reliability
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Performance
config.put(ProducerConfig.LINGER_MS_CONFIG, 5);
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// Type info for JsonSerializer
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
All Producer Configuration Properties
private Map<String, Object> baseProducerConfig() {
Map<String, Object> config = new HashMap<>();
// ── CONNECTION ───────────────────────────────────────────────────────────
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.CLIENT_ID_CONFIG, "order-service-producer");
// ── SERIALIZATION ────────────────────────────────────────────────────────
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// ── RELIABILITY (covered in articles 10, 11, 12) ─────────────────────────
config.put(ProducerConfig.ACKS_CONFIG, "all"); // wait for all ISR
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // retry indefinitely
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // no duplicates
// ── BATCHING ─────────────────────────────────────────────────────────────
config.put(ProducerConfig.LINGER_MS_CONFIG, 5); // wait up to 5ms to fill batch
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB batch size
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432L); // 32MB total buffer
// ── TIMEOUTS ─────────────────────────────────────────────────────────────
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); // 30s broker response wait
config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 2min total send deadline
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100); // 100ms between retries
// ── COMPRESSION ──────────────────────────────────────────────────────────
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // lz4, gzip, zstd, none
// ── METADATA ─────────────────────────────────────────────────────────────
config.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000); // refresh metadata every 5min
config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000); // max time send() blocks on full buffer
return config;
}
Multiple Producers: Different Serializers
A common need: one producer for JSON domain events, another for plain string commands, a third for large binary payloads.
flowchart TB
OrderSvc["Order Service"]
subgraph Producers
JsonFactory["jsonProducerFactory\nJsonSerializer\nacks=all, idempotent"]
StringFactory["stringProducerFactory\nStringSerializer\nacks=1, fast"]
AvroFactory["avroProducerFactory\nKafkaAvroSerializer\nacks=all"]
end
subgraph Templates
JsonTemplate["jsonKafkaTemplate"]
StringTemplate["stringKafkaTemplate"]
AvroTemplate["avroKafkaTemplate"]
end
JsonFactory --> JsonTemplate
StringFactory --> StringTemplate
AvroFactory --> AvroTemplate
OrderSvc -->|domain events| JsonTemplate
OrderSvc -->|commands| StringTemplate
OrderSvc -->|analytics| AvroTemplate
@Configuration
public class MultiProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
// ── JSON PRODUCER (domain events — high reliability) ─────────────────────
@Bean
public ProducerFactory<String, Object> jsonProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
config.put(ProducerConfig.LINGER_MS_CONFIG, 5);
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
@Primary // default KafkaTemplate if none specified
public KafkaTemplate<String, Object> jsonKafkaTemplate(
@Qualifier("jsonProducerFactory") ProducerFactory<String, Object> factory) {
return new KafkaTemplate<>(factory);
}
// ── STRING PRODUCER (commands — low latency, tolerate some loss) ──────────
@Bean
public ProducerFactory<String, String> stringProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "1"); // leader ack only, faster
config.put(ProducerConfig.LINGER_MS_CONFIG, 0);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> stringKafkaTemplate(
@Qualifier("stringProducerFactory") ProducerFactory<String, String> factory) {
return new KafkaTemplate<>(factory);
}
}
Inject the right template by type or qualifier:
@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, Object> jsonKafkaTemplate;
private final KafkaTemplate<String, String> stringKafkaTemplate;
public OrderEventPublisher(
@Qualifier("jsonKafkaTemplate") KafkaTemplate<String, Object> jsonKafkaTemplate,
@Qualifier("stringKafkaTemplate") KafkaTemplate<String, String> stringKafkaTemplate) {
this.jsonKafkaTemplate = jsonKafkaTemplate;
this.stringKafkaTemplate = stringKafkaTemplate;
}
public void publishOrderPlaced(OrderPlacedEvent event) {
jsonKafkaTemplate.send("orders", event.orderId(), event);
}
public void sendInventoryCommand(String productId, String command) {
stringKafkaTemplate.send("inventory-commands", productId, command);
}
}
Runtime Configuration with Environment Profiles
@Configuration
public class KafkaProducerConfig {
@Autowired
private Environment env;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
env.getRequiredProperty("kafka.bootstrap-servers"));
config.put(ProducerConfig.CLIENT_ID_CONFIG,
env.getRequiredProperty("spring.application.name") + "-producer");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Different acks per environment
if (env.acceptsProfiles(Profiles.of("prod", "staging"))) {
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
} else {
config.put(ProducerConfig.ACKS_CONFIG, "1"); // faster for local dev
}
return new DefaultKafkaProducerFactory<>(config);
}
}
Observing Producer Metrics
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(
ProducerFactory<String, Object> producerFactory) {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory);
// Observe send success/failure via ProducerListener
template.setProducerListener(new ProducerListener<>() {
@Override
public void onSuccess(ProducerRecord<String, Object> record,
RecordMetadata metadata) {
log.debug("[KAFKA] Sent: topic={} partition={} offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
}
@Override
public void onError(ProducerRecord<String, Object> record,
RecordMetadata metadata,
Exception exception) {
log.error("[KAFKA] Send failed: topic={} key={}",
record.topic(), record.key(), exception);
}
});
return template;
}
Compression Comparison
quadrantChart
title Compression Trade-offs
x-axis Low CPU --> High CPU
y-axis Low Ratio --> High Ratio
quadrant-1 High CPU, High Ratio
quadrant-2 Low CPU, High Ratio
quadrant-3 Low CPU, Low Ratio
quadrant-4 High CPU, Low Ratio
none: [0.05, 0.05]
snappy: [0.3, 0.55]
lz4: [0.25, 0.45]
zstd: [0.55, 0.85]
gzip: [0.8, 0.7]
Recommendations:
snappy: good default — low CPU, decent compression (2–3× for JSON)lz4: slightly better throughput than snappy with similar ratiozstd: best compression ratio (Kafka 2.1+) — use when network bandwidth is the bottleneckgzip: avoid — high CPU for similar compression to zstdnone: use only on fast internal networks where bandwidth is not a concern
Key Takeaways
ProducerFactorycreatesKafkaProducerinstances;KafkaTemplatewrapsProducerFactoryfor Spring-friendly sending@Beanconfiguration allows multiple producers with different serializers, reliability settings, and target clusters- Use
@Primaryto mark the defaultKafkaTemplateand@Qualifierto inject a specific one ProducerListenerprovides hooks for observing send success and failure without changing call-site codesnappycompression is the best default for JSON payloads; usezstdfor network-bandwidth-constrained deployments- Configure
acks,retries, andenable.idempotenceper environment —acks=all+ idempotence in production
Next: Producer Acknowledgments: acks, min.insync.replicas, and Data Durability — understand exactly what data durability guarantees different acks settings provide and when you need each one.