Dynamic Listener Containers and Programmatic Topic Registration
Why Dynamic Listeners?
@KafkaListener is declared at compile time. Some scenarios require listeners created at runtime:
- Multi-tenant SaaS — each tenant onboards to their own topic; you can’t redeploy to add
@KafkaListenerfor each new tenant - Feature flags — enable or disable a listener without a deployment
- Plugin systems — modules register their own topic subscriptions when loaded
- Admin APIs — operators subscribe to new topics via a REST endpoint
ConcurrentMessageListenerContainer
The core building block is ConcurrentMessageListenerContainer — the same class @KafkaListener uses internally, but constructed and started manually:
@Service
@RequiredArgsConstructor
public class DynamicListenerService {
private final ConcurrentKafkaListenerContainerFactory<String, Object> factory;
private final Map<String, ConcurrentMessageListenerContainer<String, Object>> containers =
new ConcurrentHashMap<>();
public void startListener(String topic, String groupId,
MessageListener<String, Object> listener) {
if (containers.containsKey(topic)) {
throw new IllegalStateException("Listener already registered for: " + topic);
}
ConcurrentMessageListenerContainer<String, Object> container =
factory.createContainer(topic);
container.getContainerProperties().setGroupId(groupId);
container.getContainerProperties().setMessageListener(listener);
container.start();
containers.put(topic, container);
log.info("Started dynamic listener for topic: {}", topic);
}
public void stopListener(String topic) {
ConcurrentMessageListenerContainer<String, Object> container =
containers.remove(topic);
if (container != null) {
container.stop();
log.info("Stopped dynamic listener for topic: {}", topic);
}
}
}
REST-Controlled Dynamic Subscription
@RestController
@RequestMapping("/admin/listeners")
@RequiredArgsConstructor
public class DynamicListenerController {
private final DynamicListenerService listenerService;
private final OrderEventProcessor processor;
@PostMapping
public ResponseEntity<String> subscribe(@RequestBody SubscribeRequest request) {
listenerService.startListener(
request.getTopic(),
request.getGroupId(),
record -> processor.process(record)
);
return ResponseEntity.ok("Subscribed to: " + request.getTopic());
}
@DeleteMapping("/{topic}")
public ResponseEntity<String> unsubscribe(@PathVariable String topic) {
listenerService.stopListener(topic);
return ResponseEntity.ok("Unsubscribed from: " + topic);
}
}
Multi-Tenant Example
Onboard a new tenant and start consuming their topic dynamically:
@Service
@RequiredArgsConstructor
public class TenantOnboardingService {
private final DynamicListenerService listenerService;
private final KafkaAdmin kafkaAdmin;
private final TenantEventProcessor processor;
public void onboardTenant(String tenantId) {
String topic = "tenant-" + tenantId + "-orders";
// Create the topic
kafkaAdmin.createOrModifyTopics(
TopicBuilder.name(topic).partitions(3).replicas(1).build()
);
// Start a dedicated listener
listenerService.startListener(
topic,
"platform-order-service",
record -> processor.process(tenantId, record.value())
);
log.info("Tenant {} onboarded — listening on {}", tenantId, topic);
}
public void offboardTenant(String tenantId) {
String topic = "tenant-" + tenantId + "-orders";
listenerService.stopListener(topic);
}
}
sequenceDiagram
participant Admin as "Admin REST"
participant Onboarding as "TenantOnboardingService"
participant KafkaAdmin
participant DynamicListener as "DynamicListenerService"
participant Broker
Admin->>Onboarding: POST /tenants/acme/onboard
Onboarding->>KafkaAdmin: create topic tenant-acme-orders
KafkaAdmin->>Broker: CreateTopics(tenant-acme-orders)
Onboarding->>DynamicListener: startListener(tenant-acme-orders, ...)
DynamicListener->>Broker: subscribe(tenant-acme-orders)
Note over DynamicListener: Consuming tenant-acme-orders
Programmatic Listener with AcknowledgingMessageListener
For manual acknowledgment in dynamic containers:
AcknowledgingMessageListener<String, Object> listener = (record, acknowledgment) -> {
try {
processor.process(record.value());
acknowledgment.acknowledge();
} catch (TransientException e) {
log.warn("Transient failure — not acknowledging: {}", e.getMessage());
}
};
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
container.getContainerProperties().setMessageListener(listener);
Registering with KafkaListenerEndpointRegistry
To make dynamically created containers discoverable alongside @KafkaListener containers:
@Autowired
private KafkaListenerEndpointRegistry registry;
public void startAndRegister(String topic, String groupId,
MessageListener<String, Object> listener) {
var container = factory.createContainer(topic);
container.getContainerProperties().setGroupId(groupId);
container.getContainerProperties().setMessageListener(listener);
container.setBeanName("dynamic-" + topic); // set a unique name
container.start();
// Register so it appears in registry.getListenerContainerIds()
registry.registerListenerContainer(
buildEndpoint(topic, groupId),
factory,
true // startImmediately
);
}
Registering makes the container pauseable/resumable via registry.getListenerContainer(id) like any @KafkaListener.
Concurrency on Dynamic Containers
container.setConcurrency(3); // 3 consumer threads per container
container.start();
Each concurrency unit creates one consumer thread. Size based on the topic’s partition count.
Graceful Shutdown
Dynamic containers are not managed by Spring’s lifecycle by default. Shut them down explicitly:
@PreDestroy
public void shutdown() {
containers.values().forEach(container -> {
container.stop();
log.info("Stopped dynamic container: {}", container.getListenerId());
});
containers.clear();
}
Key Takeaways
ConcurrentMessageListenerContaineris the programmatic equivalent of@KafkaListener— create it from the factory and callstart()- Store containers in a
ConcurrentHashMapkeyed by topic or tenant ID for lifecycle management - Use
AcknowledgingMessageListenerfor manual acknowledgment in dynamic containers — setAckMode.MANUALon the container properties - Register dynamic containers with
KafkaListenerEndpointRegistryto make them pauseable via the registry API - Always implement
@PreDestroyto stop containers gracefully — unmanaged containers won’t stop on application shutdown - Size concurrency to the topic’s partition count — extra threads receive no partitions
Next: Kafka Streams with Spring Boot: Stateless and Stateful Processing — build stream processing topologies using KStream, KTable, and stateful aggregations with Spring Kafka’s Streams support.