Producer @Bean Configuration: Beyond application.properties

Why @Bean Configuration?

application.properties is convenient for a single producer, but insufficient when you need:

  • Multiple producers with different serializers (e.g. one for JSON events, one for Avro)
  • Different settings per environment built at runtime (not just property substitution)
  • Producers sending to different clusters (e.g. primary + DR cluster)
  • Programmatic validation of configuration at startup
flowchart TB
    subgraph PropertiesApproach["application.properties Approach"]
        P1["Single producer config\nspring.kafka.producer.*\n✓ Simple\n✗ One producer only\n✗ No runtime logic"]
    end

    subgraph BeanApproach["@Bean Approach"]
        B1["ProducerFactory\n(creates KafkaProducer instances)"]
        B2["KafkaTemplate\n(wraps ProducerFactory)"]
        B1 --> B2
        Note["✓ Multiple factories/templates\n✓ Runtime configuration\n✓ Different serializers per template\n✓ Custom ProducerFactory subclasses"]
    end

The ProducerFactory

ProducerFactory<K, V> is responsible for creating KafkaProducer instances. DefaultKafkaProducerFactory is the standard implementation. KafkaTemplate delegates all producer operations to its ProducerFactory.

classDiagram
    class ProducerFactory~K,V~ {
        <>
        +createProducer() KafkaProducer
        +getConfigurationProperties() Map
    }

    class DefaultKafkaProducerFactory~K,V~ {
        -Map configs
        -Serializer keySerializer
        -Serializer valueSerializer
        +DefaultKafkaProducerFactory(Map configs)
        +createProducer() KafkaProducer
    }

    class KafkaTemplate~K,V~ {
        -ProducerFactory factory
        +send(topic, key, value) CompletableFuture
        +sendDefault(key, value) CompletableFuture
        +flush()
    }

    ProducerFactory <|.. DefaultKafkaProducerFactory
    KafkaTemplate --> ProducerFactory

Basic @Bean Producer Configuration

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        // Connection
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        // Serialization
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // Reliability
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Performance
        config.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // Type info for JsonSerializer
        config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(
            ProducerFactory<String, Object> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

All Producer Configuration Properties

private Map<String, Object> baseProducerConfig() {
    Map<String, Object> config = new HashMap<>();

    // ── CONNECTION ───────────────────────────────────────────────────────────
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put(ProducerConfig.CLIENT_ID_CONFIG, "order-service-producer");

    // ── SERIALIZATION ────────────────────────────────────────────────────────
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    // ── RELIABILITY (covered in articles 10, 11, 12) ─────────────────────────
    config.put(ProducerConfig.ACKS_CONFIG, "all");                 // wait for all ISR
    config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);  // retry indefinitely
    config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);    // no duplicates

    // ── BATCHING ─────────────────────────────────────────────────────────────
    config.put(ProducerConfig.LINGER_MS_CONFIG, 5);        // wait up to 5ms to fill batch
    config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);   // 32KB batch size
    config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432L); // 32MB total buffer

    // ── TIMEOUTS ─────────────────────────────────────────────────────────────
    config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);       // 30s broker response wait
    config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);     // 2min total send deadline
    config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);           // 100ms between retries

    // ── COMPRESSION ──────────────────────────────────────────────────────────
    config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // lz4, gzip, zstd, none

    // ── METADATA ─────────────────────────────────────────────────────────────
    config.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000); // refresh metadata every 5min
    config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);      // max time send() blocks on full buffer

    return config;
}

Multiple Producers: Different Serializers

A common need: one producer for JSON domain events, another for plain string commands, a third for large binary payloads.

flowchart TB
    OrderSvc["Order Service"]

    subgraph Producers
        JsonFactory["jsonProducerFactory\nJsonSerializer\nacks=all, idempotent"]
        StringFactory["stringProducerFactory\nStringSerializer\nacks=1, fast"]
        AvroFactory["avroProducerFactory\nKafkaAvroSerializer\nacks=all"]
    end

    subgraph Templates
        JsonTemplate["jsonKafkaTemplate"]
        StringTemplate["stringKafkaTemplate"]
        AvroTemplate["avroKafkaTemplate"]
    end

    JsonFactory --> JsonTemplate
    StringFactory --> StringTemplate
    AvroFactory --> AvroTemplate

    OrderSvc -->|domain events| JsonTemplate
    OrderSvc -->|commands| StringTemplate
    OrderSvc -->|analytics| AvroTemplate
@Configuration
public class MultiProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    // ── JSON PRODUCER (domain events — high reliability) ─────────────────────

    @Bean
    public ProducerFactory<String, Object> jsonProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        config.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    @Primary  // default KafkaTemplate if none specified
    public KafkaTemplate<String, Object> jsonKafkaTemplate(
            @Qualifier("jsonProducerFactory") ProducerFactory<String, Object> factory) {
        return new KafkaTemplate<>(factory);
    }

    // ── STRING PRODUCER (commands — low latency, tolerate some loss) ──────────

    @Bean
    public ProducerFactory<String, String> stringProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, "1");  // leader ack only, faster
        config.put(ProducerConfig.LINGER_MS_CONFIG, 0);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> stringKafkaTemplate(
            @Qualifier("stringProducerFactory") ProducerFactory<String, String> factory) {
        return new KafkaTemplate<>(factory);
    }
}

Inject the right template by type or qualifier:

@Service
public class OrderEventPublisher {

    private final KafkaTemplate<String, Object> jsonKafkaTemplate;
    private final KafkaTemplate<String, String> stringKafkaTemplate;

    public OrderEventPublisher(
            @Qualifier("jsonKafkaTemplate") KafkaTemplate<String, Object> jsonKafkaTemplate,
            @Qualifier("stringKafkaTemplate") KafkaTemplate<String, String> stringKafkaTemplate) {
        this.jsonKafkaTemplate = jsonKafkaTemplate;
        this.stringKafkaTemplate = stringKafkaTemplate;
    }

    public void publishOrderPlaced(OrderPlacedEvent event) {
        jsonKafkaTemplate.send("orders", event.orderId(), event);
    }

    public void sendInventoryCommand(String productId, String command) {
        stringKafkaTemplate.send("inventory-commands", productId, command);
    }
}

Runtime Configuration with Environment Profiles

@Configuration
public class KafkaProducerConfig {

    @Autowired
    private Environment env;

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            env.getRequiredProperty("kafka.bootstrap-servers"));
        config.put(ProducerConfig.CLIENT_ID_CONFIG,
            env.getRequiredProperty("spring.application.name") + "-producer");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // Different acks per environment
        if (env.acceptsProfiles(Profiles.of("prod", "staging"))) {
            config.put(ProducerConfig.ACKS_CONFIG, "all");
            config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
            config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        } else {
            config.put(ProducerConfig.ACKS_CONFIG, "1");  // faster for local dev
        }

        return new DefaultKafkaProducerFactory<>(config);
    }
}

Observing Producer Metrics

@Bean
public KafkaTemplate<String, Object> kafkaTemplate(
        ProducerFactory<String, Object> producerFactory) {

    KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory);

    // Observe send success/failure via ProducerListener
    template.setProducerListener(new ProducerListener<>() {
        @Override
        public void onSuccess(ProducerRecord<String, Object> record,
                              RecordMetadata metadata) {
            log.debug("[KAFKA] Sent: topic={} partition={} offset={}",
                metadata.topic(), metadata.partition(), metadata.offset());
        }

        @Override
        public void onError(ProducerRecord<String, Object> record,
                            RecordMetadata metadata,
                            Exception exception) {
            log.error("[KAFKA] Send failed: topic={} key={}",
                record.topic(), record.key(), exception);
        }
    });

    return template;
}

Compression Comparison

quadrantChart
    title Compression Trade-offs
    x-axis Low CPU --> High CPU
    y-axis Low Ratio --> High Ratio
    quadrant-1 High CPU, High Ratio
    quadrant-2 Low CPU, High Ratio
    quadrant-3 Low CPU, Low Ratio
    quadrant-4 High CPU, Low Ratio
    none: [0.05, 0.05]
    snappy: [0.3, 0.55]
    lz4: [0.25, 0.45]
    zstd: [0.55, 0.85]
    gzip: [0.8, 0.7]

Recommendations:

  • snappy: good default — low CPU, decent compression (2–3× for JSON)
  • lz4: slightly better throughput than snappy with similar ratio
  • zstd: best compression ratio (Kafka 2.1+) — use when network bandwidth is the bottleneck
  • gzip: avoid — high CPU for similar compression to zstd
  • none: use only on fast internal networks where bandwidth is not a concern

Key Takeaways

  • ProducerFactory creates KafkaProducer instances; KafkaTemplate wraps ProducerFactory for Spring-friendly sending
  • @Bean configuration allows multiple producers with different serializers, reliability settings, and target clusters
  • Use @Primary to mark the default KafkaTemplate and @Qualifier to inject a specific one
  • ProducerListener provides hooks for observing send success and failure without changing call-site code
  • snappy compression is the best default for JSON payloads; use zstd for network-bandwidth-constrained deployments
  • Configure acks, retries, and enable.idempotence per environment — acks=all + idempotence in production

Next: Producer Acknowledgments: acks, min.insync.replicas, and Data Durability — understand exactly what data durability guarantees different acks settings provide and when you need each one.