Monitoring: Consumer Lag, Micrometer Metrics, and Actuator Integration
What to Monitor in Kafka
Production Kafka applications need visibility into:
- Consumer lag — how many records are unprocessed per partition
- Throughput — records produced and consumed per second
- Error rates — listener exceptions, DLT records, retry counts
- Producer latency — time from
send()to broker acknowledgment - Rebalance frequency — high rebalance rate signals consumer instability
Dependencies
<!-- Micrometer Prometheus registry -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Spring Boot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Auto-Configured Kafka Metrics
Spring Boot auto-configures Kafka metrics via Micrometer when both spring-kafka and a Micrometer registry are on the classpath. No additional code needed.
Enable the Actuator and Prometheus endpoint:
management.endpoints.web.exposure.include=health,info,prometheus,metrics
management.endpoint.health.show-details=always
management.metrics.export.prometheus.enabled=true
Access metrics at http://localhost:8080/actuator/prometheus.
Key Kafka Metrics (Micrometer Names)
Consumer Metrics
| Metric | Description |
|---|---|
kafka.consumer.fetch.manager.records.lag | Records behind the high-water mark per partition |
kafka.consumer.fetch.manager.records.consumed.rate | Records consumed per second |
kafka.consumer.fetch.manager.fetch.rate | Fetch requests per second |
kafka.consumer.coordinator.rebalance.rate.per.hour | Rebalances per hour |
kafka.consumer.coordinator.join.rate | Group joins per second |
Producer Metrics
| Metric | Description |
|---|---|
kafka.producer.record.send.rate | Records sent per second |
kafka.producer.record.error.rate | Errors per second |
kafka.producer.request.latency.avg | Average request latency (ms) |
kafka.producer.buffer.available.bytes | Available producer buffer bytes |
Consumer Lag — The Most Important Metric
Consumer lag is the difference between the latest offset (high-water mark) and the committed offset for a partition. High lag means your consumers are falling behind.
flowchart LR
subgraph Partition["orders — partition 0"]
O0["0"] --> O1["1"] --> O98["..."] --> O99["offset 99 (HWM)"]
end
Committed["Committed offset: 85"]
Lag["Lag = 99 - 85 = 14"]
O99 -.-> Lag
Committed -.-> Lag
The Micrometer metric kafka.consumer.fetch.manager.records.lag provides this per-partition.
Checking Lag via Actuator
curl http://localhost:8080/actuator/metrics/kafka.consumer.fetch.manager.records.lag
{
"name": "kafka.consumer.fetch.manager.records.lag",
"measurements": [{"statistic": "VALUE", "value": 14.0}],
"availableTags": [
{"tag": "client.id", "values": ["order-service-1"]},
{"tag": "kafka.version", "values": ["3.7.0"]},
{"tag": "topic", "values": ["orders"]},
{"tag": "partition", "values": ["0"]}
]
}
Checking Lag via AdminClient
For lag queries outside the consumer process (e.g., in a monitoring service):
@Service
@RequiredArgsConstructor
public class ConsumerLagService {
private final AdminClient adminClient;
public Map<TopicPartition, Long> getLag(String groupId, String topic) throws Exception {
// Get committed offsets for the group
Map<TopicPartition, OffsetAndMetadata> committedOffsets = adminClient
.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get(10, TimeUnit.SECONDS);
// Get end offsets (HWM) for the topic's partitions
Set<TopicPartition> partitions = committedOffsets.keySet().stream()
.filter(tp -> tp.topic().equals(topic))
.collect(Collectors.toSet());
Map<TopicPartition, Long> endOffsets = adminClient
.listOffsets(partitions.stream().collect(
Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest())
))
.all()
.get(10, TimeUnit.SECONDS)
.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
// Calculate lag per partition
return partitions.stream().collect(Collectors.toMap(
tp -> tp,
tp -> endOffsets.getOrDefault(tp, 0L) -
committedOffsets.getOrDefault(tp, new OffsetAndMetadata(0)).offset()
));
}
}
Custom Metrics with MeterRegistry
Track business-level events alongside Kafka infrastructure metrics:
@Component
@RequiredArgsConstructor
public class OrderEventListener {
private final MeterRegistry meterRegistry;
private final InventoryService inventoryService;
@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrder(OrderPlacedEvent event, Acknowledgment ack) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
inventoryService.reserveStock(event);
meterRegistry.counter("orders.processed",
"status", "success",
"region", event.getRegion()
).increment();
ack.acknowledge();
} catch (Exception e) {
meterRegistry.counter("orders.processed",
"status", "failure",
"exception", e.getClass().getSimpleName()
).increment();
throw e;
} finally {
sample.stop(meterRegistry.timer("orders.processing.time",
"region", event.getRegion()));
}
}
}
Prometheus Scrape Config
# prometheus.yml
scrape_configs:
- job_name: 'order-service'
scrape_interval: 15s
metrics_path: '/actuator/prometheus'
static_configs:
- targets: ['order-service:8080']
Grafana Dashboard Queries
Key PromQL queries for a Kafka consumer dashboard:
# Consumer lag by partition
kafka_consumer_fetch_manager_records_lag{job="order-service"}
# Records consumed per second
rate(kafka_consumer_fetch_manager_records_consumed_total{job="order-service"}[1m])
# Producer error rate
rate(kafka_producer_record_error_total{job="order-service"}[1m])
# Processing time (p99)
histogram_quantile(0.99,
rate(orders_processing_time_seconds_bucket{job="order-service"}[5m])
)
# Orders processed per second
rate(orders_processed_total{job="order-service", status="success"}[1m])
Alerting on Consumer Lag
Prometheus alert rule — fire if any partition has lag > 1000 for more than 5 minutes:
groups:
- name: kafka
rules:
- alert: KafkaConsumerHighLag
expr: kafka_consumer_fetch_manager_records_lag > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "High consumer lag on {{ $labels.topic }}-{{ $labels.partition }}"
description: "Lag is {{ $value }} records"
- alert: KafkaProducerErrors
expr: rate(kafka_producer_record_error_total[5m]) > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka producer errors on {{ $labels.client_id }}"
Spring Boot Actuator Health Check
Spring Boot auto-configures a Kafka health indicator when spring-kafka is on the classpath:
curl http://localhost:8080/actuator/health
{
"status": "UP",
"components": {
"kafka": {
"status": "UP",
"details": {
"clusterId": "MkU3OEVBNTcwNTJENDM2Qk",
"brokerId": "1",
"kafkaVersion": "3.7.0-ce"
}
}
}
}
The health check connects to the broker and returns DOWN if unreachable — wire this into your load balancer health checks to remove unhealthy instances from the rotation.
Key Takeaways
- Micrometer auto-exposes Kafka consumer and producer metrics when
micrometer-registry-prometheusis on the classpath — no code required - Consumer lag (
kafka.consumer.fetch.manager.records.lag) is the single most important metric — alert on it - Use
AdminClient.listConsumerGroupOffsets()+listOffsets()to query lag from outside the consumer process - Add business metrics with
MeterRegistry—counterfor throughput and status,Timerfor latency - Set a Prometheus alert on lag > threshold for > N minutes; alert on any producer error rate > 0
- The Spring Boot Actuator Kafka health indicator checks broker connectivity — use it in liveness/readiness probes
Next: Spring Kafka Production Checklist and Best Practices — the complete pre-launch checklist: serialization, error handling, monitoring, security, and architectural guidance for production Kafka deployments.