Dynamic Listener Containers and Programmatic Topic Registration

Why Dynamic Listeners?

@KafkaListener is declared at compile time. Some scenarios require listeners created at runtime:

  • Multi-tenant SaaS — each tenant onboards to their own topic; you can’t redeploy to add @KafkaListener for each new tenant
  • Feature flags — enable or disable a listener without a deployment
  • Plugin systems — modules register their own topic subscriptions when loaded
  • Admin APIs — operators subscribe to new topics via a REST endpoint

ConcurrentMessageListenerContainer

The core building block is ConcurrentMessageListenerContainer — the same class @KafkaListener uses internally, but constructed and started manually:

@Service
@RequiredArgsConstructor
public class DynamicListenerService {

    private final ConcurrentKafkaListenerContainerFactory<String, Object> factory;
    private final Map<String, ConcurrentMessageListenerContainer<String, Object>> containers =
        new ConcurrentHashMap<>();

    public void startListener(String topic, String groupId,
                               MessageListener<String, Object> listener) {
        if (containers.containsKey(topic)) {
            throw new IllegalStateException("Listener already registered for: " + topic);
        }

        ConcurrentMessageListenerContainer<String, Object> container =
            factory.createContainer(topic);

        container.getContainerProperties().setGroupId(groupId);
        container.getContainerProperties().setMessageListener(listener);
        container.start();

        containers.put(topic, container);
        log.info("Started dynamic listener for topic: {}", topic);
    }

    public void stopListener(String topic) {
        ConcurrentMessageListenerContainer<String, Object> container =
            containers.remove(topic);
        if (container != null) {
            container.stop();
            log.info("Stopped dynamic listener for topic: {}", topic);
        }
    }
}

REST-Controlled Dynamic Subscription

@RestController
@RequestMapping("/admin/listeners")
@RequiredArgsConstructor
public class DynamicListenerController {

    private final DynamicListenerService listenerService;
    private final OrderEventProcessor processor;

    @PostMapping
    public ResponseEntity<String> subscribe(@RequestBody SubscribeRequest request) {
        listenerService.startListener(
            request.getTopic(),
            request.getGroupId(),
            record -> processor.process(record)
        );
        return ResponseEntity.ok("Subscribed to: " + request.getTopic());
    }

    @DeleteMapping("/{topic}")
    public ResponseEntity<String> unsubscribe(@PathVariable String topic) {
        listenerService.stopListener(topic);
        return ResponseEntity.ok("Unsubscribed from: " + topic);
    }
}

Multi-Tenant Example

Onboard a new tenant and start consuming their topic dynamically:

@Service
@RequiredArgsConstructor
public class TenantOnboardingService {

    private final DynamicListenerService listenerService;
    private final KafkaAdmin kafkaAdmin;
    private final TenantEventProcessor processor;

    public void onboardTenant(String tenantId) {
        String topic = "tenant-" + tenantId + "-orders";

        // Create the topic
        kafkaAdmin.createOrModifyTopics(
            TopicBuilder.name(topic).partitions(3).replicas(1).build()
        );

        // Start a dedicated listener
        listenerService.startListener(
            topic,
            "platform-order-service",
            record -> processor.process(tenantId, record.value())
        );

        log.info("Tenant {} onboarded — listening on {}", tenantId, topic);
    }

    public void offboardTenant(String tenantId) {
        String topic = "tenant-" + tenantId + "-orders";
        listenerService.stopListener(topic);
    }
}
sequenceDiagram
    participant Admin as "Admin REST"
    participant Onboarding as "TenantOnboardingService"
    participant KafkaAdmin
    participant DynamicListener as "DynamicListenerService"
    participant Broker

    Admin->>Onboarding: POST /tenants/acme/onboard
    Onboarding->>KafkaAdmin: create topic tenant-acme-orders
    KafkaAdmin->>Broker: CreateTopics(tenant-acme-orders)
    Onboarding->>DynamicListener: startListener(tenant-acme-orders, ...)
    DynamicListener->>Broker: subscribe(tenant-acme-orders)
    Note over DynamicListener: Consuming tenant-acme-orders

Programmatic Listener with AcknowledgingMessageListener

For manual acknowledgment in dynamic containers:

AcknowledgingMessageListener<String, Object> listener = (record, acknowledgment) -> {
    try {
        processor.process(record.value());
        acknowledgment.acknowledge();
    } catch (TransientException e) {
        log.warn("Transient failure — not acknowledging: {}", e.getMessage());
    }
};

container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
container.getContainerProperties().setMessageListener(listener);

Registering with KafkaListenerEndpointRegistry

To make dynamically created containers discoverable alongside @KafkaListener containers:

@Autowired
private KafkaListenerEndpointRegistry registry;

public void startAndRegister(String topic, String groupId,
                              MessageListener<String, Object> listener) {
    var container = factory.createContainer(topic);
    container.getContainerProperties().setGroupId(groupId);
    container.getContainerProperties().setMessageListener(listener);
    container.setBeanName("dynamic-" + topic);  // set a unique name
    container.start();

    // Register so it appears in registry.getListenerContainerIds()
    registry.registerListenerContainer(
        buildEndpoint(topic, groupId),
        factory,
        true  // startImmediately
    );
}

Registering makes the container pauseable/resumable via registry.getListenerContainer(id) like any @KafkaListener.


Concurrency on Dynamic Containers

container.setConcurrency(3);  // 3 consumer threads per container
container.start();

Each concurrency unit creates one consumer thread. Size based on the topic’s partition count.


Graceful Shutdown

Dynamic containers are not managed by Spring’s lifecycle by default. Shut them down explicitly:

@PreDestroy
public void shutdown() {
    containers.values().forEach(container -> {
        container.stop();
        log.info("Stopped dynamic container: {}", container.getListenerId());
    });
    containers.clear();
}

Key Takeaways

  • ConcurrentMessageListenerContainer is the programmatic equivalent of @KafkaListener — create it from the factory and call start()
  • Store containers in a ConcurrentHashMap keyed by topic or tenant ID for lifecycle management
  • Use AcknowledgingMessageListener for manual acknowledgment in dynamic containers — set AckMode.MANUAL on the container properties
  • Register dynamic containers with KafkaListenerEndpointRegistry to make them pauseable via the registry API
  • Always implement @PreDestroy to stop containers gracefully — unmanaged containers won’t stop on application shutdown
  • Size concurrency to the topic’s partition count — extra threads receive no partitions

Next: Kafka Streams with Spring Boot: Stateless and Stateful Processing — build stream processing topologies using KStream, KTable, and stateful aggregations with Spring Kafka’s Streams support.