Monitoring: Consumer Lag, Micrometer Metrics, and Actuator Integration

What to Monitor in Kafka

Production Kafka applications need visibility into:

  • Consumer lag — how many records are unprocessed per partition
  • Throughput — records produced and consumed per second
  • Error rates — listener exceptions, DLT records, retry counts
  • Producer latency — time from send() to broker acknowledgment
  • Rebalance frequency — high rebalance rate signals consumer instability

Dependencies

<!-- Micrometer Prometheus registry -->
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Spring Boot Actuator -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Auto-Configured Kafka Metrics

Spring Boot auto-configures Kafka metrics via Micrometer when both spring-kafka and a Micrometer registry are on the classpath. No additional code needed.

Enable the Actuator and Prometheus endpoint:

management.endpoints.web.exposure.include=health,info,prometheus,metrics
management.endpoint.health.show-details=always
management.metrics.export.prometheus.enabled=true

Access metrics at http://localhost:8080/actuator/prometheus.


Key Kafka Metrics (Micrometer Names)

Consumer Metrics

MetricDescription
kafka.consumer.fetch.manager.records.lagRecords behind the high-water mark per partition
kafka.consumer.fetch.manager.records.consumed.rateRecords consumed per second
kafka.consumer.fetch.manager.fetch.rateFetch requests per second
kafka.consumer.coordinator.rebalance.rate.per.hourRebalances per hour
kafka.consumer.coordinator.join.rateGroup joins per second

Producer Metrics

MetricDescription
kafka.producer.record.send.rateRecords sent per second
kafka.producer.record.error.rateErrors per second
kafka.producer.request.latency.avgAverage request latency (ms)
kafka.producer.buffer.available.bytesAvailable producer buffer bytes

Consumer Lag — The Most Important Metric

Consumer lag is the difference between the latest offset (high-water mark) and the committed offset for a partition. High lag means your consumers are falling behind.

flowchart LR
    subgraph Partition["orders — partition 0"]
        O0["0"] --> O1["1"] --> O98["..."] --> O99["offset 99 (HWM)"]
    end
    Committed["Committed offset: 85"]
    Lag["Lag = 99 - 85 = 14"]

    O99 -.-> Lag
    Committed -.-> Lag

The Micrometer metric kafka.consumer.fetch.manager.records.lag provides this per-partition.


Checking Lag via Actuator

curl http://localhost:8080/actuator/metrics/kafka.consumer.fetch.manager.records.lag

{
  "name": "kafka.consumer.fetch.manager.records.lag",
  "measurements": [{"statistic": "VALUE", "value": 14.0}],
  "availableTags": [
    {"tag": "client.id", "values": ["order-service-1"]},
    {"tag": "kafka.version", "values": ["3.7.0"]},
    {"tag": "topic", "values": ["orders"]},
    {"tag": "partition", "values": ["0"]}
  ]
}

Checking Lag via AdminClient

For lag queries outside the consumer process (e.g., in a monitoring service):

@Service
@RequiredArgsConstructor
public class ConsumerLagService {

    private final AdminClient adminClient;

    public Map<TopicPartition, Long> getLag(String groupId, String topic) throws Exception {
        // Get committed offsets for the group
        Map<TopicPartition, OffsetAndMetadata> committedOffsets = adminClient
            .listConsumerGroupOffsets(groupId)
            .partitionsToOffsetAndMetadata()
            .get(10, TimeUnit.SECONDS);

        // Get end offsets (HWM) for the topic's partitions
        Set<TopicPartition> partitions = committedOffsets.keySet().stream()
            .filter(tp -> tp.topic().equals(topic))
            .collect(Collectors.toSet());

        Map<TopicPartition, Long> endOffsets = adminClient
            .listOffsets(partitions.stream().collect(
                Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest())
            ))
            .all()
            .get(10, TimeUnit.SECONDS)
            .entrySet().stream()
            .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));

        // Calculate lag per partition
        return partitions.stream().collect(Collectors.toMap(
            tp -> tp,
            tp -> endOffsets.getOrDefault(tp, 0L) -
                  committedOffsets.getOrDefault(tp, new OffsetAndMetadata(0)).offset()
        ));
    }
}

Custom Metrics with MeterRegistry

Track business-level events alongside Kafka infrastructure metrics:

@Component
@RequiredArgsConstructor
public class OrderEventListener {

    private final MeterRegistry meterRegistry;
    private final InventoryService inventoryService;

    @KafkaListener(topics = "orders", groupId = "inventory-service")
    public void onOrder(OrderPlacedEvent event, Acknowledgment ack) {
        Timer.Sample sample = Timer.start(meterRegistry);
        try {
            inventoryService.reserveStock(event);
            meterRegistry.counter("orders.processed",
                "status", "success",
                "region", event.getRegion()
            ).increment();
            ack.acknowledge();
        } catch (Exception e) {
            meterRegistry.counter("orders.processed",
                "status", "failure",
                "exception", e.getClass().getSimpleName()
            ).increment();
            throw e;
        } finally {
            sample.stop(meterRegistry.timer("orders.processing.time",
                "region", event.getRegion()));
        }
    }
}

Prometheus Scrape Config

# prometheus.yml
scrape_configs:
  - job_name: 'order-service'
    scrape_interval: 15s
    metrics_path: '/actuator/prometheus'
    static_configs:
      - targets: ['order-service:8080']

Grafana Dashboard Queries

Key PromQL queries for a Kafka consumer dashboard:

# Consumer lag by partition
kafka_consumer_fetch_manager_records_lag{job="order-service"}

# Records consumed per second
rate(kafka_consumer_fetch_manager_records_consumed_total{job="order-service"}[1m])

# Producer error rate
rate(kafka_producer_record_error_total{job="order-service"}[1m])

# Processing time (p99)
histogram_quantile(0.99,
  rate(orders_processing_time_seconds_bucket{job="order-service"}[5m])
)

# Orders processed per second
rate(orders_processed_total{job="order-service", status="success"}[1m])

Alerting on Consumer Lag

Prometheus alert rule — fire if any partition has lag > 1000 for more than 5 minutes:

groups:
  - name: kafka
    rules:
      - alert: KafkaConsumerHighLag
        expr: kafka_consumer_fetch_manager_records_lag > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High consumer lag on {{ $labels.topic }}-{{ $labels.partition }}"
          description: "Lag is {{ $value }} records"

      - alert: KafkaProducerErrors
        expr: rate(kafka_producer_record_error_total[5m]) > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka producer errors on {{ $labels.client_id }}"

Spring Boot Actuator Health Check

Spring Boot auto-configures a Kafka health indicator when spring-kafka is on the classpath:

curl http://localhost:8080/actuator/health

{
  "status": "UP",
  "components": {
    "kafka": {
      "status": "UP",
      "details": {
        "clusterId": "MkU3OEVBNTcwNTJENDM2Qk",
        "brokerId": "1",
        "kafkaVersion": "3.7.0-ce"
      }
    }
  }
}

The health check connects to the broker and returns DOWN if unreachable — wire this into your load balancer health checks to remove unhealthy instances from the rotation.


Key Takeaways

  • Micrometer auto-exposes Kafka consumer and producer metrics when micrometer-registry-prometheus is on the classpath — no code required
  • Consumer lag (kafka.consumer.fetch.manager.records.lag) is the single most important metric — alert on it
  • Use AdminClient.listConsumerGroupOffsets() + listOffsets() to query lag from outside the consumer process
  • Add business metrics with MeterRegistrycounter for throughput and status, Timer for latency
  • Set a Prometheus alert on lag > threshold for > N minutes; alert on any producer error rate > 0
  • The Spring Boot Actuator Kafka health indicator checks broker connectivity — use it in liveness/readiness probes

Next: Spring Kafka Production Checklist and Best Practices — the complete pre-launch checklist: serialization, error handling, monitoring, security, and architectural guidance for production Kafka deployments.