KafkaAdmin and AdminClient: Managing Topics Programmatically
Why Manage Topics Programmatically?
CLI commands work for one-time setup. Production services need:
- Startup validation — verify required topics exist before the application starts
- Auto-provisioning — create topics at deployment time with correct config
- Dynamic tenant onboarding — create per-tenant topics at runtime
- Config drift detection — compare actual topic config against expected values
Spring Kafka provides KafkaAdmin for declarative topic management and AdminClient for imperative operations.
KafkaAdmin — Declarative Topic Creation
Declare NewTopic beans — Spring Kafka creates them at startup if they don’t exist:
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic ordersTopic() {
return TopicBuilder.name("orders")
.partitions(6)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG,
String.valueOf(Duration.ofDays(7).toMillis()))
.config(TopicConfig.CLEANUP_POLICY_CONFIG, "delete")
.build();
}
@Bean
public NewTopic ordersDltTopic() {
return TopicBuilder.name("orders.DLT")
.partitions(6)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG,
String.valueOf(Duration.ofDays(30).toMillis()))
.build();
}
@Bean
public NewTopic customerProfilesTopic() {
return TopicBuilder.name("customer-profiles")
.partitions(3)
.replicas(3)
.config(TopicConfig.CLEANUP_POLICY_CONFIG, "compact") // KTable backing topic
.build();
}
}
KafkaAdmin is auto-configured by Spring Boot. If you need to configure it explicitly:
@Bean
public KafkaAdmin kafkaAdmin() {
KafkaAdmin admin = new KafkaAdmin(
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
);
admin.setFatalIfBrokerNotAvailable(true); // fail fast if broker unreachable at startup
return admin;
}
Fail-Fast Startup Validation
setFatalIfBrokerNotAvailable(true) makes the application refuse to start if the broker is unreachable — preferable to silent startup followed by mysterious consumer failures.
spring.kafka.admin.fail-fast=true
AdminClient — Imperative Operations
For runtime operations, use AdminClient directly:
@Service
@RequiredArgsConstructor
public class TopicManagementService {
private final KafkaAdmin kafkaAdmin;
private AdminClient adminClient() {
return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
}
Or inject it as a bean:
@Bean
public AdminClient adminClient(KafkaAdmin kafkaAdmin) {
return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
Creating Topics at Runtime
public void createTenantTopic(String tenantId) {
String topicName = "tenant-" + tenantId + "-orders";
NewTopic topic = TopicBuilder.name(topicName)
.partitions(3)
.replicas(1)
.build();
try (AdminClient admin = adminClient()) {
CreateTopicsResult result = admin.createTopics(List.of(topic));
result.all().get(30, TimeUnit.SECONDS);
log.info("Created topic: {}", topicName);
} catch (TopicExistsException e) {
log.info("Topic already exists: {}", topicName);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
throw new RuntimeException("Failed to create topic: " + topicName, e);
}
}
Checking Topic Existence
public boolean topicExists(String topicName) {
try (AdminClient admin = adminClient()) {
Set<String> topics = admin.listTopics().names().get(10, TimeUnit.SECONDS);
return topics.contains(topicName);
} catch (Exception e) {
throw new RuntimeException("Failed to list topics", e);
}
}
Describing Topics
public TopicDescription describeTopic(String topicName) {
try (AdminClient admin = adminClient()) {
DescribeTopicsResult result = admin.describeTopics(List.of(topicName));
Map<String, TopicDescription> descriptions =
result.allTopicNames().get(10, TimeUnit.SECONDS);
return descriptions.get(topicName);
} catch (Exception e) {
throw new RuntimeException("Failed to describe topic: " + topicName, e);
}
}
// Usage:
TopicDescription desc = describeTopic("orders");
log.info("Topic: {} partitions: {} isInternal: {}",
desc.name(), desc.partitions().size(), desc.isInternal());
desc.partitions().forEach(p ->
log.info(" Partition {} leader: {} replicas: {}",
p.partition(), p.leader().id(),
p.replicas().stream().map(n -> String.valueOf(n.id()))
.collect(Collectors.joining(", ")))
);
Altering Topic Configuration
public void setRetention(String topicName, Duration retention) {
ConfigEntry retentionEntry = new ConfigEntry(
TopicConfig.RETENTION_MS_CONFIG,
String.valueOf(retention.toMillis())
);
AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
try (AdminClient admin = adminClient()) {
admin.incrementalAlterConfigs(Map.of(resource, List.of(op)))
.all()
.get(30, TimeUnit.SECONDS);
log.info("Updated retention for {} to {}ms", topicName, retention.toMillis());
} catch (Exception e) {
throw new RuntimeException("Failed to alter config for: " + topicName, e);
}
}
Deleting Topics
public void deleteTopic(String topicName) {
try (AdminClient admin = adminClient()) {
admin.deleteTopics(List.of(topicName)).all().get(30, TimeUnit.SECONDS);
log.info("Deleted topic: {}", topicName);
} catch (Exception e) {
throw new RuntimeException("Failed to delete topic: " + topicName, e);
}
}
Describing Consumer Group Offsets
Inspect how far behind a consumer group is:
public void describeConsumerGroup(String groupId) {
try (AdminClient admin = adminClient()) {
ListConsumerGroupOffsetsResult result =
admin.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> offsets =
result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
offsets.forEach((tp, offsetMeta) ->
log.info("Group={} topic={} partition={} committedOffset={}",
groupId, tp.topic(), tp.partition(), offsetMeta.offset())
);
} catch (Exception e) {
throw new RuntimeException("Failed to describe consumer group: " + groupId, e);
}
}
Checking Topic Configurations at Startup
Verify a topic has the expected settings before the application starts:
@Component
@RequiredArgsConstructor
public class TopicConfigValidator implements ApplicationRunner {
private final KafkaAdmin kafkaAdmin;
@Override
public void run(ApplicationArguments args) throws Exception {
try (AdminClient admin = AdminClient.create(
kafkaAdmin.getConfigurationProperties())) {
ConfigResource resource = new ConfigResource(
ConfigResource.Type.TOPIC, "orders");
Config config = admin.describeConfigs(List.of(resource))
.all()
.get(10, TimeUnit.SECONDS)
.get(resource);
String retentionMs = config.get(TopicConfig.RETENTION_MS_CONFIG).value();
long retention = Long.parseLong(retentionMs);
if (retention < Duration.ofDays(7).toMillis()) {
throw new IllegalStateException(
"orders topic retention is too low: " + retentionMs + "ms");
}
log.info("Topic config validated: orders retention={}ms", retentionMs);
}
}
}
Key Takeaways
- Declare
NewTopicbeans and Spring Kafka creates them at startup if they don’t exist — this is the recommended approach for static topics - Use
KafkaAdmin.setFatalIfBrokerNotAvailable(true)to fail fast if the broker is unreachable at startup AdminClientgives you full imperative control: create, describe, alter config, delete topics, inspect consumer group offsets- Always close
AdminClient— use try-with-resources to avoid connection leaks createTopics()throwsTopicExistsExceptionwrapped inExecutionException— catch it explicitly to handle idempotent creation- Use
ApplicationRunner+AdminClientat startup to validate critical topic configurations before the application begins processing
Next: Testing Kafka Applications: EmbeddedKafka and Testcontainers — write reliable integration tests using @EmbeddedKafka for fast unit-like tests and KafkaContainer (Testcontainers) for real-broker validation.