Kafka Consumer in Spring Boot: @KafkaListener Basics
How @KafkaListener Works
@KafkaListener is a Spring Kafka annotation that registers a method as a Kafka consumer. Under the hood, Spring Kafka creates a ConcurrentMessageListenerContainer — a managed thread pool that continuously polls the broker and dispatches records to your method.
flowchart LR
Broker["Kafka Broker"]
subgraph Container["ConcurrentMessageListenerContainer"]
T1["Poll Thread 1\n(Partition 0)"]
T2["Poll Thread 2\n(Partition 1)"]
T3["Poll Thread 3\n(Partition 2)"]
end
Method["@KafkaListener\nvoid onOrderPlaced(...)"]
Broker -->|"fetch records"| T1
Broker -->|"fetch records"| T2
Broker -->|"fetch records"| T3
T1 -->|"deserialize + dispatch"| Method
T2 -->|"deserialize + dispatch"| Method
T3 -->|"deserialize + dispatch"| Method
Each poll thread handles one or more partitions. The method is called once per record (in record mode) or once per batch.
Maven Dependency
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Minimal Configuration
# application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=inventory-service
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
auto-offset-reset=earliest means a new consumer group starts reading from the beginning of the topic. Without it, it defaults to latest — only new messages are received.
Consuming String Messages
@Component
@Slf4j
public class OrderEventConsumer {
@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrderEvent(String message) {
log.info("Received: {}", message);
// process message
}
}
Enable Kafka listener support — required in Spring Boot (auto-configured, but good to know):
@SpringBootApplication
@EnableKafka // usually auto-enabled by Spring Boot's auto-configuration
public class InventoryServiceApplication {
public static void main(String[] args) {
SpringApplication.run(InventoryServiceApplication.class, args);
}
}
Consuming JSON (POJO) Messages
Configure JSON deserialization:
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.events
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.events.OrderPlacedEvent
@Component
@Slf4j
public class OrderEventConsumer {
private final InventoryService inventoryService;
public OrderEventConsumer(InventoryService inventoryService) {
this.inventoryService = inventoryService;
}
@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrderPlaced(OrderPlacedEvent event) {
log.info("Processing order: orderId={} customerId={} total={}",
event.orderId(), event.customerId(), event.total());
inventoryService.reserveStock(event);
}
}
Accessing Record Metadata
Inject partition, offset, key, timestamp, and headers alongside the payload:
@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrderPlaced(
@Payload OrderPlacedEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
@Header(value = "correlationId", required = false) String correlationId) {
log.info("[{}] order={} partition={} offset={} ts={}",
correlationId, event.orderId(), partition, offset, timestamp);
inventoryService.reserveStock(event);
}
Or receive the entire ConsumerRecord for full access:
@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrderPlaced(ConsumerRecord<String, OrderPlacedEvent> record) {
String key = record.key();
OrderPlacedEvent event = record.value();
int partition = record.partition();
long offset = record.offset();
Headers headers = record.headers();
log.info("key={} partition={} offset={}", key, partition, offset);
inventoryService.reserveStock(event);
}
Listening to Multiple Topics
// Multiple topics in one listener
@KafkaListener(topics = {"orders", "order-updates"}, groupId = "inventory-service")
public void onOrderEvent(ConsumerRecord<String, String> record) {
log.info("Topic: {} key: {}", record.topic(), record.key());
}
// Topic pattern (regex) — matches any topic starting with "order"
@KafkaListener(topicPattern = "order.*", groupId = "inventory-service")
public void onAnyOrderTopic(ConsumerRecord<String, String> record) {
log.info("Topic: {}", record.topic());
}
Batch Listening
Instead of receiving one record at a time, receive an entire poll batch:
spring.kafka.listener.type=batch
@KafkaListener(topics = "orders", groupId = "inventory-service-batch")
public void onOrderBatch(List<OrderPlacedEvent> events) {
log.info("Processing batch of {} orders", events.size());
inventoryService.reserveStockBatch(events);
}
With full metadata per record in batch mode:
@KafkaListener(topics = "orders", groupId = "inventory-service-batch")
public void onOrderBatch(
List<ConsumerRecord<String, OrderPlacedEvent>> records,
Acknowledgment ack) {
log.info("Batch size: {}", records.size());
for (ConsumerRecord<String, OrderPlacedEvent> record : records) {
log.info(" offset={} key={}", record.offset(), record.key());
inventoryService.reserveStock(record.value());
}
ack.acknowledge();
}
The Consumer Thread Model
flowchart TB
subgraph ConsumerGroup["Consumer Group: inventory-service"]
subgraph Instance1["App Instance 1 (concurrency=2)"]
T1["Thread 1 → Partition 0"]
T2["Thread 2 → Partition 1"]
end
subgraph Instance2["App Instance 2 (concurrency=2)"]
T3["Thread 3 → Partition 2"]
T4["Thread 4 → Partition 3"]
end
end
subgraph Topic["Topic: orders (4 partitions)"]
P0["Partition 0"]
P1["Partition 1"]
P2["Partition 2"]
P3["Partition 3"]
end
P0 --> T1
P1 --> T2
P2 --> T3
P3 --> T4
Each thread polls its assigned partition(s) independently. concurrency sets the number of threads per application instance. Set it to match the number of partitions per instance for maximum throughput.
spring.kafka.listener.concurrency=3
Or per-listener:
@KafkaListener(topics = "orders", groupId = "inventory-service", concurrency = "3")
public void onOrderPlaced(OrderPlacedEvent event) { ... }
The Poll Loop Internals
sequenceDiagram
participant Thread as Consumer Thread
participant Broker as Kafka Broker
participant Method as @KafkaListener Method
loop Every poll cycle
Thread->>Broker: poll(max.poll.records=500, timeout=5000ms)
Broker-->>Thread: Up to 500 records
loop For each record in batch
Thread->>Method: invoke(record)
Method-->>Thread: return (or throw)
end
Thread->>Broker: commitOffset (if auto-commit)
Note over Thread: Heartbeat sent during poll\n(not between records)
end
Key settings:
| Property | Default | Controls |
|---|---|---|
max.poll.records | 500 | Max records returned per poll |
max.poll.interval.ms | 300,000 (5 min) | Max time between polls before consumer is considered dead |
session.timeout.ms | 45,000 (45 sec) | Time without heartbeat before coordinator removes consumer |
heartbeat.interval.ms | 3,000 (3 sec) | How often consumer sends heartbeats |
If your @KafkaListener method takes longer than max.poll.interval.ms to process a full batch, the consumer is kicked out of the group and a rebalance occurs. Either reduce max.poll.records or increase max.poll.interval.ms.
spring.kafka.consumer.properties.max.poll.records=50
spring.kafka.consumer.properties.max.poll.interval.ms=600000
@KafkaListener on the Class Level
Instead of one method, use a class with @KafkaHandler methods — one per message type:
@Component
@KafkaListener(topics = "orders", groupId = "inventory-service")
public class OrderEventDispatcher {
@KafkaHandler
public void onOrderPlaced(OrderPlacedEvent event) {
log.info("OrderPlaced: {}", event.orderId());
}
@KafkaHandler
public void onOrderCancelled(OrderCancelledEvent event) {
log.info("OrderCancelled: {}", event.orderId());
}
@KafkaHandler(isDefault = true)
public void onUnknownEvent(Object event) {
log.warn("Unknown event type: {}", event.getClass().getSimpleName());
}
}
Spring Kafka dispatches to the right method based on the __TypeId__ header (added by JsonSerializer) or the payload type. Covered in depth in @SendTo and @KafkaHandler.
Starting and Stopping Listeners
By default all listeners start automatically. Control lifecycle explicitly:
@Autowired
private KafkaListenerEndpointRegistry registry;
// Stop a specific listener
public void stopInventoryListener() {
registry.getListenerContainer("inventory-listener").stop();
}
// Start it again
public void startInventoryListener() {
registry.getListenerContainer("inventory-listener").start();
}
Assign an explicit ID to reference it:
@KafkaListener(id = "inventory-listener", topics = "orders", groupId = "inventory-service")
public void onOrderPlaced(OrderPlacedEvent event) { ... }
Complete Inventory Service Consumer
@Component
@Slf4j
public class InventoryConsumer {
private final InventoryService inventoryService;
public InventoryConsumer(InventoryService inventoryService) {
this.inventoryService = inventoryService;
}
@KafkaListener(
id = "inventory-orders-listener",
topics = "orders",
groupId = "inventory-service",
concurrency = "3"
)
public void onOrderPlaced(
@Payload OrderPlacedEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("[inventory] Processing orderId={} partition={} offset={}",
event.orderId(), partition, offset);
try {
inventoryService.reserveStock(event);
} catch (InsufficientStockException e) {
log.warn("[inventory] Insufficient stock for orderId={}: {}",
event.orderId(), e.getMessage());
// publish StockUnavailableEvent back to Kafka
}
}
}
Key Takeaways
@KafkaListenerregisters a method as a Kafka consumer — Spring Kafka manages the poll loop, deserialization, and thread lifecycle- Use
@Payload+@Headerto access the payload and record metadata separately, or injectConsumerRecord<K,V>for full access concurrencycontrols threads per listener — set it to match the number of partitions assigned to this instanceauto-offset-reset=earlieststarts new consumer groups from the beginning;latestonly receives new messages- Batch mode (
spring.kafka.listener.type=batch) delivers aList<>of records per poll cycle — useful for bulk database operations - If processing takes longer than
max.poll.interval.ms, the consumer is evicted; reducemax.poll.recordsor increase the interval - Assign an
idto each listener to reference it inKafkaListenerEndpointRegistryfor lifecycle control
Next: Consumer Groups: Parallel Processing and Partition Assignment Strategies — understand how Kafka assigns partitions to consumers, the different assignor strategies, and how to scale consumers.