Pausing, Resuming, and Stopping Listener Containers

Why Control Container Lifecycle?

A running listener consumes from Kafka continuously. In production you need to:

  • Pause consumption when a downstream service is overloaded (back-pressure)
  • Resume once the downstream recovers
  • Stop a container entirely during maintenance or feature flag toggles
  • Restart after a configuration change without redeploying

Spring Kafka exposes all of this through KafkaListenerEndpointRegistry and the container’s own lifecycle API.


Container States

stateDiagram-v2
    [*] --> Running : start()
    Running --> Paused : pause()
    Paused --> Running : resume()
    Running --> Stopped : stop()
    Stopped --> Running : start()
    Paused --> Stopped : stop()
  • Running — polling Kafka, dispatching to listener
  • Paused — broker connection maintained, consumer heartbeat sent, no new records fetched
  • Stopped — consumer thread terminated, partitions released back to group

Paused is preferable to stopped for temporary throttling: it avoids a rebalance and keeps the consumer’s partition assignment intact.


KafkaListenerEndpointRegistry

Spring auto-creates KafkaListenerEndpointRegistry and registers every @KafkaListener container in it. Inject it anywhere:

@Service
@RequiredArgsConstructor
public class ListenerControlService {

    private final KafkaListenerEndpointRegistry registry;

    public void pauseOrderListener() {
        registry.getListenerContainer("order-listener").pause();
    }

    public void resumeOrderListener() {
        registry.getListenerContainer("order-listener").resume();
    }

    public void stopOrderListener() {
        registry.getListenerContainer("order-listener").stop();
    }

    public void startOrderListener() {
        registry.getListenerContainer("order-listener").start();
    }
}

The string "order-listener" must match the id on the @KafkaListener:

@KafkaListener(
    id = "order-listener",
    topics = "orders",
    groupId = "order-service"
)
public void onOrder(OrderPlacedEvent event) {
    orderService.process(event);
}

Listing All Containers

registry.getListenerContainerIds().forEach(id -> {
    MessageListenerContainer container = registry.getListenerContainer(id);
    log.info("id={} running={} paused={}",
        id, container.isRunning(), container.isPaused());
});

REST-Controlled Pause/Resume

Expose pause and resume as admin endpoints:

@RestController
@RequestMapping("/admin/listeners")
@RequiredArgsConstructor
public class ListenerAdminController {

    private final KafkaListenerEndpointRegistry registry;

    @PostMapping("/{id}/pause")
    public ResponseEntity<String> pause(@PathVariable String id) {
        MessageListenerContainer container = registry.getListenerContainer(id);
        if (container == null) return ResponseEntity.notFound().build();
        container.pause();
        return ResponseEntity.ok("Paused: " + id);
    }

    @PostMapping("/{id}/resume")
    public ResponseEntity<String> resume(@PathVariable String id) {
        MessageListenerContainer container = registry.getListenerContainer(id);
        if (container == null) return ResponseEntity.notFound().build();
        container.resume();
        return ResponseEntity.ok("Resumed: " + id);
    }

    @GetMapping
    public Map<String, String> status() {
        Map<String, String> result = new LinkedHashMap<>();
        registry.getListenerContainerIds().forEach(id -> {
            var c = registry.getListenerContainer(id);
            result.put(id, c.isPaused() ? "PAUSED" : c.isRunning() ? "RUNNING" : "STOPPED");
        });
        return result;
    }
}

Circuit Breaker Pattern — Auto-Pause on Downstream Failure

Pause consumption when downstream calls fail repeatedly; resume when health checks pass:

@Component
@RequiredArgsConstructor
public class InventoryCircuitBreaker {

    private final KafkaListenerEndpointRegistry registry;
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private static final int FAILURE_THRESHOLD = 5;

    public void recordFailure() {
        int count = failureCount.incrementAndGet();
        if (count >= FAILURE_THRESHOLD) {
            log.warn("Failure threshold reached — pausing order-listener");
            registry.getListenerContainer("order-listener").pause();
        }
    }

    public void recordSuccess() {
        failureCount.set(0);
    }

    @Scheduled(fixedDelay = 30_000)
    public void healthCheck() {
        var container = registry.getListenerContainer("order-listener");
        if (container.isPaused() && inventoryServiceIsHealthy()) {
            log.info("Inventory service healthy — resuming order-listener");
            container.resume();
            failureCount.set(0);
        }
    }

    private boolean inventoryServiceIsHealthy() {
        // call health endpoint or check circuit breaker state
        return true;
    }
}
sequenceDiagram
    participant Listener
    participant CircuitBreaker
    participant Inventory
    participant Registry

    Listener->>Inventory: reserveStock(event)
    Inventory-->>Listener: InventoryUnavailableException
    Listener->>CircuitBreaker: recordFailure() [count=5]
    CircuitBreaker->>Registry: pause("order-listener")
    Note over Registry: Consumer maintains heartbeat\nNo new records fetched

    loop every 30s
        CircuitBreaker->>Inventory: health check
        Inventory-->>CircuitBreaker: 200 OK
        CircuitBreaker->>Registry: resume("order-listener")
    end

Back-Pressure Pattern — Pause on Queue Depth

Pause when an internal queue is full; resume when it drains:

@Component
@RequiredArgsConstructor
public class OrderBackPressureListener {

    private final KafkaListenerEndpointRegistry registry;
    private final BlockingQueue<OrderPlacedEvent> processingQueue;

    @KafkaListener(id = "order-listener", topics = "orders")
    public void onOrder(OrderPlacedEvent event) {
        if (!processingQueue.offer(event)) {
            // queue full — pause and let the processor drain it
            registry.getListenerContainer("order-listener").pause();
            log.warn("Processing queue full — paused consumption");
        }
    }

    // called by the downstream processor after each successful dequeue
    public void onProcessed() {
        if (processingQueue.size() < processingQueue.remainingCapacity() / 2) {
            var container = registry.getListenerContainer("order-listener");
            if (container.isPaused()) {
                container.resume();
                log.info("Queue drained below threshold — resumed consumption");
            }
        }
    }
}

Idle Event — Auto-Pause When Topic is Empty

Listen for ListenerContainerIdleEvent and pause until new activity is detected:

factory.getContainerProperties().setIdleEventInterval(10_000L);

@EventListener
public void onIdle(ListenerContainerIdleEvent event) {
    log.info("Container {} idle — pausing to save resources", event.getListenerId());
    event.getSource(MessageListenerContainer.class).pause();
}

@EventListener
public void onNonIdle(ListenerContainerNoLongerIdleEvent event) {
    event.getSource(MessageListenerContainer.class).resume();
}

Graceful Shutdown

Spring Kafka registers the KafkaListenerEndpointRegistry as a SmartLifecycle bean. On application shutdown, it calls stop() on all containers — waiting for in-flight records to finish processing before releasing partitions.

Control the maximum wait time:

spring.kafka.listener.shutdown-timeout=30s

For a clean shutdown sequence:

sequenceDiagram
    participant App
    participant Registry
    participant Container
    participant Broker

    App->>Registry: stop() [JVM shutdown hook]
    Registry->>Container: stop()
    Container->>Container: finish in-flight record
    Container->>Broker: commitOffset()
    Container->>Broker: leaveGroup()
    Broker->>Broker: rebalance remaining consumers
    Container-->>Registry: stopped
    Registry-->>App: all containers stopped

stop() vs pause() — When to Use Each

ScenarioUseReason
Temporary overload (seconds to minutes)pause()Avoids rebalance, heartbeat maintained
Maintenance window (minutes to hours)stop()Releases partitions, lets other consumers take over
Permanently disabling a featurestop()Clean group exit
Circuit breaker / back-pressurepause()Fast to resume, no partition churn

Key Takeaways

  • KafkaListenerEndpointRegistry is the central registry for all @KafkaListener containers — inject it anywhere
  • @KafkaListener(id = "...") is required to look up a specific container by name
  • pause() maintains the broker connection and heartbeat — no rebalance, fast to resume
  • stop() terminates the consumer thread and releases partitions — triggers a group rebalance
  • Use the circuit breaker pattern (pause on failure threshold, resume on health check) to protect downstream services
  • Spring Kafka handles graceful shutdown automatically via SmartLifecycle; configure shutdown-timeout for long-running listeners

Next: JSON Serialization: JsonSerializer, JsonDeserializer, and Type Mapping — configure reliable JSON serialization with trusted packages, type headers, and polymorphic type mapping.