Kafka Consumer in Spring Boot: @KafkaListener Basics

How @KafkaListener Works

@KafkaListener is a Spring Kafka annotation that registers a method as a Kafka consumer. Under the hood, Spring Kafka creates a ConcurrentMessageListenerContainer — a managed thread pool that continuously polls the broker and dispatches records to your method.

flowchart LR
    Broker["Kafka Broker"]
    subgraph Container["ConcurrentMessageListenerContainer"]
        T1["Poll Thread 1\n(Partition 0)"]
        T2["Poll Thread 2\n(Partition 1)"]
        T3["Poll Thread 3\n(Partition 2)"]
    end
    Method["@KafkaListener\nvoid onOrderPlaced(...)"]

    Broker -->|"fetch records"| T1
    Broker -->|"fetch records"| T2
    Broker -->|"fetch records"| T3
    T1 -->|"deserialize + dispatch"| Method
    T2 -->|"deserialize + dispatch"| Method
    T3 -->|"deserialize + dispatch"| Method

Each poll thread handles one or more partitions. The method is called once per record (in record mode) or once per batch.


Maven Dependency

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Minimal Configuration

# application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=inventory-service
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest

auto-offset-reset=earliest means a new consumer group starts reading from the beginning of the topic. Without it, it defaults to latest — only new messages are received.


Consuming String Messages

@Component
@Slf4j
public class OrderEventConsumer {

    @KafkaListener(topics = "orders", groupId = "inventory-service")
    public void onOrderEvent(String message) {
        log.info("Received: {}", message);
        // process message
    }
}

Enable Kafka listener support — required in Spring Boot (auto-configured, but good to know):

@SpringBootApplication
@EnableKafka  // usually auto-enabled by Spring Boot's auto-configuration
public class InventoryServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(InventoryServiceApplication.class, args);
    }
}

Consuming JSON (POJO) Messages

Configure JSON deserialization:

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.events
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.events.OrderPlacedEvent
@Component
@Slf4j
public class OrderEventConsumer {

    private final InventoryService inventoryService;

    public OrderEventConsumer(InventoryService inventoryService) {
        this.inventoryService = inventoryService;
    }

    @KafkaListener(topics = "orders", groupId = "inventory-service")
    public void onOrderPlaced(OrderPlacedEvent event) {
        log.info("Processing order: orderId={} customerId={} total={}",
            event.orderId(), event.customerId(), event.total());
        inventoryService.reserveStock(event);
    }
}

Accessing Record Metadata

Inject partition, offset, key, timestamp, and headers alongside the payload:

@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrderPlaced(
        @Payload OrderPlacedEvent event,
        @Header(KafkaHeaders.RECEIVED_KEY) String key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.OFFSET) long offset,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
        @Header(value = "correlationId", required = false) String correlationId) {

    log.info("[{}] order={} partition={} offset={} ts={}",
        correlationId, event.orderId(), partition, offset, timestamp);
    inventoryService.reserveStock(event);
}

Or receive the entire ConsumerRecord for full access:

@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrderPlaced(ConsumerRecord<String, OrderPlacedEvent> record) {
    String key = record.key();
    OrderPlacedEvent event = record.value();
    int partition = record.partition();
    long offset = record.offset();
    Headers headers = record.headers();

    log.info("key={} partition={} offset={}", key, partition, offset);
    inventoryService.reserveStock(event);
}

Listening to Multiple Topics

// Multiple topics in one listener
@KafkaListener(topics = {"orders", "order-updates"}, groupId = "inventory-service")
public void onOrderEvent(ConsumerRecord<String, String> record) {
    log.info("Topic: {} key: {}", record.topic(), record.key());
}

// Topic pattern (regex) — matches any topic starting with "order"
@KafkaListener(topicPattern = "order.*", groupId = "inventory-service")
public void onAnyOrderTopic(ConsumerRecord<String, String> record) {
    log.info("Topic: {}", record.topic());
}

Batch Listening

Instead of receiving one record at a time, receive an entire poll batch:

spring.kafka.listener.type=batch
@KafkaListener(topics = "orders", groupId = "inventory-service-batch")
public void onOrderBatch(List<OrderPlacedEvent> events) {
    log.info("Processing batch of {} orders", events.size());
    inventoryService.reserveStockBatch(events);
}

With full metadata per record in batch mode:

@KafkaListener(topics = "orders", groupId = "inventory-service-batch")
public void onOrderBatch(
        List<ConsumerRecord<String, OrderPlacedEvent>> records,
        Acknowledgment ack) {

    log.info("Batch size: {}", records.size());
    for (ConsumerRecord<String, OrderPlacedEvent> record : records) {
        log.info("  offset={} key={}", record.offset(), record.key());
        inventoryService.reserveStock(record.value());
    }
    ack.acknowledge();
}

The Consumer Thread Model

flowchart TB
    subgraph ConsumerGroup["Consumer Group: inventory-service"]
        subgraph Instance1["App Instance 1 (concurrency=2)"]
            T1["Thread 1 → Partition 0"]
            T2["Thread 2 → Partition 1"]
        end
        subgraph Instance2["App Instance 2 (concurrency=2)"]
            T3["Thread 3 → Partition 2"]
            T4["Thread 4 → Partition 3"]
        end
    end

    subgraph Topic["Topic: orders (4 partitions)"]
        P0["Partition 0"]
        P1["Partition 1"]
        P2["Partition 2"]
        P3["Partition 3"]
    end

    P0 --> T1
    P1 --> T2
    P2 --> T3
    P3 --> T4

Each thread polls its assigned partition(s) independently. concurrency sets the number of threads per application instance. Set it to match the number of partitions per instance for maximum throughput.

spring.kafka.listener.concurrency=3

Or per-listener:

@KafkaListener(topics = "orders", groupId = "inventory-service", concurrency = "3")
public void onOrderPlaced(OrderPlacedEvent event) { ... }

The Poll Loop Internals

sequenceDiagram
    participant Thread as Consumer Thread
    participant Broker as Kafka Broker
    participant Method as @KafkaListener Method

    loop Every poll cycle
        Thread->>Broker: poll(max.poll.records=500, timeout=5000ms)
        Broker-->>Thread: Up to 500 records

        loop For each record in batch
            Thread->>Method: invoke(record)
            Method-->>Thread: return (or throw)
        end

        Thread->>Broker: commitOffset (if auto-commit)
        Note over Thread: Heartbeat sent during poll\n(not between records)
    end

Key settings:

PropertyDefaultControls
max.poll.records500Max records returned per poll
max.poll.interval.ms300,000 (5 min)Max time between polls before consumer is considered dead
session.timeout.ms45,000 (45 sec)Time without heartbeat before coordinator removes consumer
heartbeat.interval.ms3,000 (3 sec)How often consumer sends heartbeats

If your @KafkaListener method takes longer than max.poll.interval.ms to process a full batch, the consumer is kicked out of the group and a rebalance occurs. Either reduce max.poll.records or increase max.poll.interval.ms.

spring.kafka.consumer.properties.max.poll.records=50
spring.kafka.consumer.properties.max.poll.interval.ms=600000

@KafkaListener on the Class Level

Instead of one method, use a class with @KafkaHandler methods — one per message type:

@Component
@KafkaListener(topics = "orders", groupId = "inventory-service")
public class OrderEventDispatcher {

    @KafkaHandler
    public void onOrderPlaced(OrderPlacedEvent event) {
        log.info("OrderPlaced: {}", event.orderId());
    }

    @KafkaHandler
    public void onOrderCancelled(OrderCancelledEvent event) {
        log.info("OrderCancelled: {}", event.orderId());
    }

    @KafkaHandler(isDefault = true)
    public void onUnknownEvent(Object event) {
        log.warn("Unknown event type: {}", event.getClass().getSimpleName());
    }
}

Spring Kafka dispatches to the right method based on the __TypeId__ header (added by JsonSerializer) or the payload type. Covered in depth in @SendTo and @KafkaHandler.


Starting and Stopping Listeners

By default all listeners start automatically. Control lifecycle explicitly:

@Autowired
private KafkaListenerEndpointRegistry registry;

// Stop a specific listener
public void stopInventoryListener() {
    registry.getListenerContainer("inventory-listener").stop();
}

// Start it again
public void startInventoryListener() {
    registry.getListenerContainer("inventory-listener").start();
}

Assign an explicit ID to reference it:

@KafkaListener(id = "inventory-listener", topics = "orders", groupId = "inventory-service")
public void onOrderPlaced(OrderPlacedEvent event) { ... }

Complete Inventory Service Consumer

@Component
@Slf4j
public class InventoryConsumer {

    private final InventoryService inventoryService;

    public InventoryConsumer(InventoryService inventoryService) {
        this.inventoryService = inventoryService;
    }

    @KafkaListener(
        id = "inventory-orders-listener",
        topics = "orders",
        groupId = "inventory-service",
        concurrency = "3"
    )
    public void onOrderPlaced(
            @Payload OrderPlacedEvent event,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset) {

        log.info("[inventory] Processing orderId={} partition={} offset={}",
            event.orderId(), partition, offset);

        try {
            inventoryService.reserveStock(event);
        } catch (InsufficientStockException e) {
            log.warn("[inventory] Insufficient stock for orderId={}: {}",
                event.orderId(), e.getMessage());
            // publish StockUnavailableEvent back to Kafka
        }
    }
}

Key Takeaways

  • @KafkaListener registers a method as a Kafka consumer — Spring Kafka manages the poll loop, deserialization, and thread lifecycle
  • Use @Payload + @Header to access the payload and record metadata separately, or inject ConsumerRecord<K,V> for full access
  • concurrency controls threads per listener — set it to match the number of partitions assigned to this instance
  • auto-offset-reset=earliest starts new consumer groups from the beginning; latest only receives new messages
  • Batch mode (spring.kafka.listener.type=batch) delivers a List<> of records per poll cycle — useful for bulk database operations
  • If processing takes longer than max.poll.interval.ms, the consumer is evicted; reduce max.poll.records or increase the interval
  • Assign an id to each listener to reference it in KafkaListenerEndpointRegistry for lifecycle control

Next: Consumer Groups: Parallel Processing and Partition Assignment Strategies — understand how Kafka assigns partitions to consumers, the different assignor strategies, and how to scale consumers.