Consumer @Bean Configuration: ConcurrentKafkaListenerContainerFactory
Why @Bean Configuration?
application.properties covers the common cases. But real applications need multiple listener factories — one for orders with manual acknowledgment and concurrency 3, another for analytics events with batch listening and different deserializers. @Bean configuration gives you a factory per use case, full IDE support, and the ability to wire in custom components like error handlers and interceptors.
The Factory Relationship
flowchart TD
CF["ConsumerFactory\n(connection + deserialization config)"]
LCF["ConcurrentKafkaListenerContainerFactory\n(container behaviour config)"]
MLCP["MessageListenerContainerProperties\n(AckMode, poll timeout, idle)"]
KL["@KafkaListener\ncontainerFactory = 'factoryBeanName'"]
CF --> LCF
LCF --> MLCP
LCF --> KL
ConsumerFactory handles what to connect to and how to deserialize. ConcurrentKafkaListenerContainerFactory handles how the container behaves: concurrency, acknowledgment, error handling, interceptors.
Minimal @Bean Setup
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.events");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
kafkaListenerContainerFactory(
ConsumerFactory<String, OrderPlacedEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}
When a @KafkaListener does not specify containerFactory, Spring looks for a bean named exactly kafkaListenerContainerFactory. Name it anything else and you must reference it explicitly.
Concurrency
factory.setConcurrency(3);
Spawns 3 KafkaMessageListenerContainer instances — one thread per instance. Each thread owns a subset of the partitions assigned to the group.
flowchart TB
subgraph Factory["ConcurrentKafkaListenerContainerFactory (concurrency=3)"]
C0["Container-0\nThread-0 → orders-P0, P3"]
C1["Container-1\nThread-1 → orders-P1, P4"]
C2["Container-2\nThread-2 → orders-P2, P5"]
end
Factory --> Topic["orders (6 partitions)"]
Setting concurrency higher than the number of partitions leaves extra threads idle — they join the group but receive no partitions.
AckMode
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
All AckMode values are available on ContainerProperties.AckMode. Setting it here applies to every listener that uses this factory.
Poll Timeout
factory.getContainerProperties().setPollTimeout(3000); // milliseconds
How long poll() blocks waiting for records. Lower values reduce idle-detection latency; higher values reduce CPU overhead when the topic is quiet. Default is 5000ms.
Idle Event Threshold
Publish a ListenerContainerIdleEvent when no records are received for a given time — useful for circuit breakers or health checks:
factory.getContainerProperties().setIdleEventInterval(30_000L); // 30 seconds
@EventListener
public void onIdle(ListenerContainerIdleEvent event) {
log.info("Container {} has been idle for {}ms",
event.getListenerId(), event.getIdleTime());
}
Multiple Factories for Different Use Cases
@Configuration
public class KafkaConsumerConfig {
// --- shared bootstrap config ---
private Map<String, Object> baseConsumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
// --- order events: manual ack, 3 concurrent threads ---
@Bean
public ConsumerFactory<String, OrderPlacedEvent> orderConsumerFactory() {
Map<String, Object> props = baseConsumerProps();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, OrderPlacedEvent.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
orderListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
factory.setConsumerFactory(orderConsumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
// --- analytics events: batch listener, auto ack, high throughput ---
@Bean
public ConsumerFactory<String, String> analyticsConsumerFactory() {
Map<String, Object> props = baseConsumerProps();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "analytics-service");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
analyticsListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(analyticsConsumerFactory());
factory.setBatchListener(true);
factory.setConcurrency(6);
return factory;
}
}
Reference the factory by bean name:
@KafkaListener(
topics = "orders",
containerFactory = "orderListenerContainerFactory"
)
public void onOrder(OrderPlacedEvent event, Acknowledgment ack) {
orderService.process(event);
ack.acknowledge();
}
@KafkaListener(
topics = "page-views",
containerFactory = "analyticsListenerContainerFactory"
)
public void onPageViews(List<String> batch) {
analyticsService.record(batch);
}
Wiring in an Error Handler
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
orderListenerContainerFactory(DefaultErrorHandler errorHandler) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
factory.setConsumerFactory(orderConsumerFactory());
factory.setConcurrency(3);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
@Bean
public DefaultErrorHandler errorHandler() {
return new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),
new ExponentialBackOff(1000L, 2.0)
);
}
Wiring in a Record Interceptor
Intercept every record before it reaches your listener — useful for tracing, metrics, or logging:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
orderListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
factory.setConsumerFactory(orderConsumerFactory());
factory.setRecordInterceptor(tracingInterceptor());
return factory;
}
@Bean
public RecordInterceptor<String, OrderPlacedEvent> tracingInterceptor() {
return (record, consumer) -> {
String traceId = new String(
record.headers().lastHeader("X-Trace-Id").value()
);
MDC.put("traceId", traceId);
return record; // return null to skip this record
};
}
Batch Listener Configuration
factory.setBatchListener(true);
Changes the listener method signature to accept List<ConsumerRecord<K, V>> or List<V>. Pair with AckMode.MANUAL for per-record control inside the batch.
flowchart LR
Poll["poll() → 100 records"] --> Batch["List<ConsumerRecord>"]
Batch --> Listener["onBatch(List<OrderPlacedEvent> events,\n Acknowledgment ack)"]
Listener -->|"ack.acknowledge()"| Commit["commitOffset(last+1)"]
Listener -->|"ack.nack(idx, duration)"| Retry["Seek back to failed offset"]
ContainerProperties Reference
| Property | Method | Default | Notes |
|---|---|---|---|
| AckMode | setAckMode(...) | BATCH | Controls when offsets are committed |
| Poll timeout | setPollTimeout(ms) | 5000 | How long poll() blocks |
| Idle event interval | setIdleEventInterval(ms) | null | Fires ListenerContainerIdleEvent |
| Sync commit timeout | setSyncCommitTimeout(Duration) | null | Timeout for synchronous commits |
| Ack time | setAckTime(ms) | 5000 | Used with TIME AckMode |
| Ack count | setAckCount(n) | 1 | Used with COUNT AckMode |
Key Takeaways
- Name your primary factory
kafkaListenerContainerFactory—@KafkaListenerwithoutcontainerFactoryresolves it by convention - Create multiple factories when listeners have different concurrency, AckMode, deserializers, or error handlers
- Set concurrency ≤ number of partitions — extra threads stay idle
- Wire
CommonErrorHandlerinto the factory, not individually per listener - Use
RecordInterceptorfor cross-cutting concerns (tracing, metrics) without polluting business logic setBatchListener(true)+AckMode.MANUALis the most flexible pattern for high-throughput batch processing
Next: Filtering Messages with RecordFilterStrategy — filter records at the container level before they reach your listener, without any if/return boilerplate in business code.