Pausing, Resuming, and Stopping Listener Containers
Why Control Container Lifecycle?
A running listener consumes from Kafka continuously. In production you need to:
- Pause consumption when a downstream service is overloaded (back-pressure)
- Resume once the downstream recovers
- Stop a container entirely during maintenance or feature flag toggles
- Restart after a configuration change without redeploying
Spring Kafka exposes all of this through KafkaListenerEndpointRegistry and the container’s own lifecycle API.
Container States
stateDiagram-v2
[*] --> Running : start()
Running --> Paused : pause()
Paused --> Running : resume()
Running --> Stopped : stop()
Stopped --> Running : start()
Paused --> Stopped : stop()
- Running — polling Kafka, dispatching to listener
- Paused — broker connection maintained, consumer heartbeat sent, no new records fetched
- Stopped — consumer thread terminated, partitions released back to group
Paused is preferable to stopped for temporary throttling: it avoids a rebalance and keeps the consumer’s partition assignment intact.
KafkaListenerEndpointRegistry
Spring auto-creates KafkaListenerEndpointRegistry and registers every @KafkaListener container in it. Inject it anywhere:
@Service
@RequiredArgsConstructor
public class ListenerControlService {
private final KafkaListenerEndpointRegistry registry;
public void pauseOrderListener() {
registry.getListenerContainer("order-listener").pause();
}
public void resumeOrderListener() {
registry.getListenerContainer("order-listener").resume();
}
public void stopOrderListener() {
registry.getListenerContainer("order-listener").stop();
}
public void startOrderListener() {
registry.getListenerContainer("order-listener").start();
}
}
The string "order-listener" must match the id on the @KafkaListener:
@KafkaListener(
id = "order-listener",
topics = "orders",
groupId = "order-service"
)
public void onOrder(OrderPlacedEvent event) {
orderService.process(event);
}
Listing All Containers
registry.getListenerContainerIds().forEach(id -> {
MessageListenerContainer container = registry.getListenerContainer(id);
log.info("id={} running={} paused={}",
id, container.isRunning(), container.isPaused());
});
REST-Controlled Pause/Resume
Expose pause and resume as admin endpoints:
@RestController
@RequestMapping("/admin/listeners")
@RequiredArgsConstructor
public class ListenerAdminController {
private final KafkaListenerEndpointRegistry registry;
@PostMapping("/{id}/pause")
public ResponseEntity<String> pause(@PathVariable String id) {
MessageListenerContainer container = registry.getListenerContainer(id);
if (container == null) return ResponseEntity.notFound().build();
container.pause();
return ResponseEntity.ok("Paused: " + id);
}
@PostMapping("/{id}/resume")
public ResponseEntity<String> resume(@PathVariable String id) {
MessageListenerContainer container = registry.getListenerContainer(id);
if (container == null) return ResponseEntity.notFound().build();
container.resume();
return ResponseEntity.ok("Resumed: " + id);
}
@GetMapping
public Map<String, String> status() {
Map<String, String> result = new LinkedHashMap<>();
registry.getListenerContainerIds().forEach(id -> {
var c = registry.getListenerContainer(id);
result.put(id, c.isPaused() ? "PAUSED" : c.isRunning() ? "RUNNING" : "STOPPED");
});
return result;
}
}
Circuit Breaker Pattern — Auto-Pause on Downstream Failure
Pause consumption when downstream calls fail repeatedly; resume when health checks pass:
@Component
@RequiredArgsConstructor
public class InventoryCircuitBreaker {
private final KafkaListenerEndpointRegistry registry;
private final AtomicInteger failureCount = new AtomicInteger(0);
private static final int FAILURE_THRESHOLD = 5;
public void recordFailure() {
int count = failureCount.incrementAndGet();
if (count >= FAILURE_THRESHOLD) {
log.warn("Failure threshold reached — pausing order-listener");
registry.getListenerContainer("order-listener").pause();
}
}
public void recordSuccess() {
failureCount.set(0);
}
@Scheduled(fixedDelay = 30_000)
public void healthCheck() {
var container = registry.getListenerContainer("order-listener");
if (container.isPaused() && inventoryServiceIsHealthy()) {
log.info("Inventory service healthy — resuming order-listener");
container.resume();
failureCount.set(0);
}
}
private boolean inventoryServiceIsHealthy() {
// call health endpoint or check circuit breaker state
return true;
}
}
sequenceDiagram
participant Listener
participant CircuitBreaker
participant Inventory
participant Registry
Listener->>Inventory: reserveStock(event)
Inventory-->>Listener: InventoryUnavailableException
Listener->>CircuitBreaker: recordFailure() [count=5]
CircuitBreaker->>Registry: pause("order-listener")
Note over Registry: Consumer maintains heartbeat\nNo new records fetched
loop every 30s
CircuitBreaker->>Inventory: health check
Inventory-->>CircuitBreaker: 200 OK
CircuitBreaker->>Registry: resume("order-listener")
end
Back-Pressure Pattern — Pause on Queue Depth
Pause when an internal queue is full; resume when it drains:
@Component
@RequiredArgsConstructor
public class OrderBackPressureListener {
private final KafkaListenerEndpointRegistry registry;
private final BlockingQueue<OrderPlacedEvent> processingQueue;
@KafkaListener(id = "order-listener", topics = "orders")
public void onOrder(OrderPlacedEvent event) {
if (!processingQueue.offer(event)) {
// queue full — pause and let the processor drain it
registry.getListenerContainer("order-listener").pause();
log.warn("Processing queue full — paused consumption");
}
}
// called by the downstream processor after each successful dequeue
public void onProcessed() {
if (processingQueue.size() < processingQueue.remainingCapacity() / 2) {
var container = registry.getListenerContainer("order-listener");
if (container.isPaused()) {
container.resume();
log.info("Queue drained below threshold — resumed consumption");
}
}
}
}
Idle Event — Auto-Pause When Topic is Empty
Listen for ListenerContainerIdleEvent and pause until new activity is detected:
factory.getContainerProperties().setIdleEventInterval(10_000L);
@EventListener
public void onIdle(ListenerContainerIdleEvent event) {
log.info("Container {} idle — pausing to save resources", event.getListenerId());
event.getSource(MessageListenerContainer.class).pause();
}
@EventListener
public void onNonIdle(ListenerContainerNoLongerIdleEvent event) {
event.getSource(MessageListenerContainer.class).resume();
}
Graceful Shutdown
Spring Kafka registers the KafkaListenerEndpointRegistry as a SmartLifecycle bean. On application shutdown, it calls stop() on all containers — waiting for in-flight records to finish processing before releasing partitions.
Control the maximum wait time:
spring.kafka.listener.shutdown-timeout=30s
For a clean shutdown sequence:
sequenceDiagram
participant App
participant Registry
participant Container
participant Broker
App->>Registry: stop() [JVM shutdown hook]
Registry->>Container: stop()
Container->>Container: finish in-flight record
Container->>Broker: commitOffset()
Container->>Broker: leaveGroup()
Broker->>Broker: rebalance remaining consumers
Container-->>Registry: stopped
Registry-->>App: all containers stopped
stop() vs pause() — When to Use Each
| Scenario | Use | Reason |
|---|---|---|
| Temporary overload (seconds to minutes) | pause() | Avoids rebalance, heartbeat maintained |
| Maintenance window (minutes to hours) | stop() | Releases partitions, lets other consumers take over |
| Permanently disabling a feature | stop() | Clean group exit |
| Circuit breaker / back-pressure | pause() | Fast to resume, no partition churn |
Key Takeaways
KafkaListenerEndpointRegistryis the central registry for all@KafkaListenercontainers — inject it anywhere@KafkaListener(id = "...")is required to look up a specific container by namepause()maintains the broker connection and heartbeat — no rebalance, fast to resumestop()terminates the consumer thread and releases partitions — triggers a group rebalance- Use the circuit breaker pattern (pause on failure threshold, resume on health check) to protect downstream services
- Spring Kafka handles graceful shutdown automatically via
SmartLifecycle; configureshutdown-timeoutfor long-running listeners
Next: JSON Serialization: JsonSerializer, JsonDeserializer, and Type Mapping — configure reliable JSON serialization with trusted packages, type headers, and polymorphic type mapping.