Consumer @Bean Configuration: ConcurrentKafkaListenerContainerFactory

Why @Bean Configuration?

application.properties covers the common cases. But real applications need multiple listener factories — one for orders with manual acknowledgment and concurrency 3, another for analytics events with batch listening and different deserializers. @Bean configuration gives you a factory per use case, full IDE support, and the ability to wire in custom components like error handlers and interceptors.


The Factory Relationship

flowchart TD
    CF["ConsumerFactory\n(connection + deserialization config)"]
    LCF["ConcurrentKafkaListenerContainerFactory\n(container behaviour config)"]
    MLCP["MessageListenerContainerProperties\n(AckMode, poll timeout, idle)"]
    KL["@KafkaListener\ncontainerFactory = 'factoryBeanName'"]

    CF --> LCF
    LCF --> MLCP
    LCF --> KL

ConsumerFactory handles what to connect to and how to deserialize. ConcurrentKafkaListenerContainerFactory handles how the container behaves: concurrency, acknowledgment, error handling, interceptors.


Minimal @Bean Setup

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.events");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
            kafkaListenerContainerFactory(
                    ConsumerFactory<String, OrderPlacedEvent> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }
}

When a @KafkaListener does not specify containerFactory, Spring looks for a bean named exactly kafkaListenerContainerFactory. Name it anything else and you must reference it explicitly.


Concurrency

factory.setConcurrency(3);

Spawns 3 KafkaMessageListenerContainer instances — one thread per instance. Each thread owns a subset of the partitions assigned to the group.

flowchart TB
    subgraph Factory["ConcurrentKafkaListenerContainerFactory (concurrency=3)"]
        C0["Container-0\nThread-0 → orders-P0, P3"]
        C1["Container-1\nThread-1 → orders-P1, P4"]
        C2["Container-2\nThread-2 → orders-P2, P5"]
    end
    Factory --> Topic["orders (6 partitions)"]

Setting concurrency higher than the number of partitions leaves extra threads idle — they join the group but receive no partitions.


AckMode

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

All AckMode values are available on ContainerProperties.AckMode. Setting it here applies to every listener that uses this factory.


Poll Timeout

factory.getContainerProperties().setPollTimeout(3000); // milliseconds

How long poll() blocks waiting for records. Lower values reduce idle-detection latency; higher values reduce CPU overhead when the topic is quiet. Default is 5000ms.


Idle Event Threshold

Publish a ListenerContainerIdleEvent when no records are received for a given time — useful for circuit breakers or health checks:

factory.getContainerProperties().setIdleEventInterval(30_000L); // 30 seconds
@EventListener
public void onIdle(ListenerContainerIdleEvent event) {
    log.info("Container {} has been idle for {}ms",
        event.getListenerId(), event.getIdleTime());
}

Multiple Factories for Different Use Cases

@Configuration
public class KafkaConsumerConfig {

    // --- shared bootstrap config ---
    private Map<String, Object> baseConsumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }

    // --- order events: manual ack, 3 concurrent threads ---
    @Bean
    public ConsumerFactory<String, OrderPlacedEvent> orderConsumerFactory() {
        Map<String, Object> props = baseConsumerProps();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, OrderPlacedEvent.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
            orderListenerContainerFactory() {

        var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
        factory.setConsumerFactory(orderConsumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }

    // --- analytics events: batch listener, auto ack, high throughput ---
    @Bean
    public ConsumerFactory<String, String> analyticsConsumerFactory() {
        Map<String, Object> props = baseConsumerProps();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "analytics-service");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
            analyticsListenerContainerFactory() {

        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(analyticsConsumerFactory());
        factory.setBatchListener(true);
        factory.setConcurrency(6);
        return factory;
    }
}

Reference the factory by bean name:

@KafkaListener(
    topics = "orders",
    containerFactory = "orderListenerContainerFactory"
)
public void onOrder(OrderPlacedEvent event, Acknowledgment ack) {
    orderService.process(event);
    ack.acknowledge();
}

@KafkaListener(
    topics = "page-views",
    containerFactory = "analyticsListenerContainerFactory"
)
public void onPageViews(List<String> batch) {
    analyticsService.record(batch);
}

Wiring in an Error Handler

@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
        orderListenerContainerFactory(DefaultErrorHandler errorHandler) {

    var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
    factory.setConsumerFactory(orderConsumerFactory());
    factory.setConcurrency(3);
    factory.setCommonErrorHandler(errorHandler);
    return factory;
}

@Bean
public DefaultErrorHandler errorHandler() {
    return new DefaultErrorHandler(
        new DeadLetterPublishingRecoverer(kafkaTemplate),
        new ExponentialBackOff(1000L, 2.0)
    );
}

Wiring in a Record Interceptor

Intercept every record before it reaches your listener — useful for tracing, metrics, or logging:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
        orderListenerContainerFactory() {

    var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
    factory.setConsumerFactory(orderConsumerFactory());
    factory.setRecordInterceptor(tracingInterceptor());
    return factory;
}

@Bean
public RecordInterceptor<String, OrderPlacedEvent> tracingInterceptor() {
    return (record, consumer) -> {
        String traceId = new String(
            record.headers().lastHeader("X-Trace-Id").value()
        );
        MDC.put("traceId", traceId);
        return record;  // return null to skip this record
    };
}

Batch Listener Configuration

factory.setBatchListener(true);

Changes the listener method signature to accept List<ConsumerRecord<K, V>> or List<V>. Pair with AckMode.MANUAL for per-record control inside the batch.

flowchart LR
    Poll["poll() → 100 records"] --> Batch["List<ConsumerRecord>"]
    Batch --> Listener["onBatch(List<OrderPlacedEvent> events,\n Acknowledgment ack)"]
    Listener -->|"ack.acknowledge()"| Commit["commitOffset(last+1)"]
    Listener -->|"ack.nack(idx, duration)"| Retry["Seek back to failed offset"]

ContainerProperties Reference

PropertyMethodDefaultNotes
AckModesetAckMode(...)BATCHControls when offsets are committed
Poll timeoutsetPollTimeout(ms)5000How long poll() blocks
Idle event intervalsetIdleEventInterval(ms)nullFires ListenerContainerIdleEvent
Sync commit timeoutsetSyncCommitTimeout(Duration)nullTimeout for synchronous commits
Ack timesetAckTime(ms)5000Used with TIME AckMode
Ack countsetAckCount(n)1Used with COUNT AckMode

Key Takeaways

  • Name your primary factory kafkaListenerContainerFactory@KafkaListener without containerFactory resolves it by convention
  • Create multiple factories when listeners have different concurrency, AckMode, deserializers, or error handlers
  • Set concurrency ≤ number of partitions — extra threads stay idle
  • Wire CommonErrorHandler into the factory, not individually per listener
  • Use RecordInterceptor for cross-cutting concerns (tracing, metrics) without polluting business logic
  • setBatchListener(true) + AckMode.MANUAL is the most flexible pattern for high-throughput batch processing

Next: Filtering Messages with RecordFilterStrategy — filter records at the container level before they reach your listener, without any if/return boilerplate in business code.