KafkaAdmin and AdminClient: Managing Topics Programmatically

Why Manage Topics Programmatically?

CLI commands work for one-time setup. Production services need:

  • Startup validation — verify required topics exist before the application starts
  • Auto-provisioning — create topics at deployment time with correct config
  • Dynamic tenant onboarding — create per-tenant topics at runtime
  • Config drift detection — compare actual topic config against expected values

Spring Kafka provides KafkaAdmin for declarative topic management and AdminClient for imperative operations.


KafkaAdmin — Declarative Topic Creation

Declare NewTopic beans — Spring Kafka creates them at startup if they don’t exist:

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic ordersTopic() {
        return TopicBuilder.name("orders")
            .partitions(6)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG,
                String.valueOf(Duration.ofDays(7).toMillis()))
            .config(TopicConfig.CLEANUP_POLICY_CONFIG, "delete")
            .build();
    }

    @Bean
    public NewTopic ordersDltTopic() {
        return TopicBuilder.name("orders.DLT")
            .partitions(6)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG,
                String.valueOf(Duration.ofDays(30).toMillis()))
            .build();
    }

    @Bean
    public NewTopic customerProfilesTopic() {
        return TopicBuilder.name("customer-profiles")
            .partitions(3)
            .replicas(3)
            .config(TopicConfig.CLEANUP_POLICY_CONFIG, "compact")  // KTable backing topic
            .build();
    }
}

KafkaAdmin is auto-configured by Spring Boot. If you need to configure it explicitly:

@Bean
public KafkaAdmin kafkaAdmin() {
    KafkaAdmin admin = new KafkaAdmin(
        Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    );
    admin.setFatalIfBrokerNotAvailable(true);  // fail fast if broker unreachable at startup
    return admin;
}

Fail-Fast Startup Validation

setFatalIfBrokerNotAvailable(true) makes the application refuse to start if the broker is unreachable — preferable to silent startup followed by mysterious consumer failures.

spring.kafka.admin.fail-fast=true

AdminClient — Imperative Operations

For runtime operations, use AdminClient directly:

@Service
@RequiredArgsConstructor
public class TopicManagementService {

    private final KafkaAdmin kafkaAdmin;

    private AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }
}

Or inject it as a bean:

@Bean
public AdminClient adminClient(KafkaAdmin kafkaAdmin) {
    return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}

Creating Topics at Runtime

public void createTenantTopic(String tenantId) {
    String topicName = "tenant-" + tenantId + "-orders";

    NewTopic topic = TopicBuilder.name(topicName)
        .partitions(3)
        .replicas(1)
        .build();

    try (AdminClient admin = adminClient()) {
        CreateTopicsResult result = admin.createTopics(List.of(topic));
        result.all().get(30, TimeUnit.SECONDS);
        log.info("Created topic: {}", topicName);
    } catch (TopicExistsException e) {
        log.info("Topic already exists: {}", topicName);
    } catch (ExecutionException | InterruptedException | TimeoutException e) {
        throw new RuntimeException("Failed to create topic: " + topicName, e);
    }
}

Checking Topic Existence

public boolean topicExists(String topicName) {
    try (AdminClient admin = adminClient()) {
        Set<String> topics = admin.listTopics().names().get(10, TimeUnit.SECONDS);
        return topics.contains(topicName);
    } catch (Exception e) {
        throw new RuntimeException("Failed to list topics", e);
    }
}

Describing Topics

public TopicDescription describeTopic(String topicName) {
    try (AdminClient admin = adminClient()) {
        DescribeTopicsResult result = admin.describeTopics(List.of(topicName));
        Map<String, TopicDescription> descriptions =
            result.allTopicNames().get(10, TimeUnit.SECONDS);
        return descriptions.get(topicName);
    } catch (Exception e) {
        throw new RuntimeException("Failed to describe topic: " + topicName, e);
    }
}

// Usage:
TopicDescription desc = describeTopic("orders");
log.info("Topic: {} partitions: {} isInternal: {}",
    desc.name(), desc.partitions().size(), desc.isInternal());

desc.partitions().forEach(p ->
    log.info("  Partition {} leader: {} replicas: {}",
        p.partition(), p.leader().id(),
        p.replicas().stream().map(n -> String.valueOf(n.id()))
            .collect(Collectors.joining(", ")))
);

Altering Topic Configuration

public void setRetention(String topicName, Duration retention) {
    ConfigEntry retentionEntry = new ConfigEntry(
        TopicConfig.RETENTION_MS_CONFIG,
        String.valueOf(retention.toMillis())
    );

    AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);

    try (AdminClient admin = adminClient()) {
        admin.incrementalAlterConfigs(Map.of(resource, List.of(op)))
            .all()
            .get(30, TimeUnit.SECONDS);
        log.info("Updated retention for {} to {}ms", topicName, retention.toMillis());
    } catch (Exception e) {
        throw new RuntimeException("Failed to alter config for: " + topicName, e);
    }
}

Deleting Topics

public void deleteTopic(String topicName) {
    try (AdminClient admin = adminClient()) {
        admin.deleteTopics(List.of(topicName)).all().get(30, TimeUnit.SECONDS);
        log.info("Deleted topic: {}", topicName);
    } catch (Exception e) {
        throw new RuntimeException("Failed to delete topic: " + topicName, e);
    }
}

Describing Consumer Group Offsets

Inspect how far behind a consumer group is:

public void describeConsumerGroup(String groupId) {
    try (AdminClient admin = adminClient()) {
        ListConsumerGroupOffsetsResult result =
            admin.listConsumerGroupOffsets(groupId);

        Map<TopicPartition, OffsetAndMetadata> offsets =
            result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);

        offsets.forEach((tp, offsetMeta) ->
            log.info("Group={} topic={} partition={} committedOffset={}",
                groupId, tp.topic(), tp.partition(), offsetMeta.offset())
        );
    } catch (Exception e) {
        throw new RuntimeException("Failed to describe consumer group: " + groupId, e);
    }
}

Checking Topic Configurations at Startup

Verify a topic has the expected settings before the application starts:

@Component
@RequiredArgsConstructor
public class TopicConfigValidator implements ApplicationRunner {

    private final KafkaAdmin kafkaAdmin;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        try (AdminClient admin = AdminClient.create(
                kafkaAdmin.getConfigurationProperties())) {

            ConfigResource resource = new ConfigResource(
                ConfigResource.Type.TOPIC, "orders");

            Config config = admin.describeConfigs(List.of(resource))
                .all()
                .get(10, TimeUnit.SECONDS)
                .get(resource);

            String retentionMs = config.get(TopicConfig.RETENTION_MS_CONFIG).value();
            long retention = Long.parseLong(retentionMs);

            if (retention < Duration.ofDays(7).toMillis()) {
                throw new IllegalStateException(
                    "orders topic retention is too low: " + retentionMs + "ms");
            }

            log.info("Topic config validated: orders retention={}ms", retentionMs);
        }
    }
}

Key Takeaways

  • Declare NewTopic beans and Spring Kafka creates them at startup if they don’t exist — this is the recommended approach for static topics
  • Use KafkaAdmin.setFatalIfBrokerNotAvailable(true) to fail fast if the broker is unreachable at startup
  • AdminClient gives you full imperative control: create, describe, alter config, delete topics, inspect consumer group offsets
  • Always close AdminClient — use try-with-resources to avoid connection leaks
  • createTopics() throws TopicExistsException wrapped in ExecutionException — catch it explicitly to handle idempotent creation
  • Use ApplicationRunner + AdminClient at startup to validate critical topic configurations before the application begins processing

Next: Testing Kafka Applications: EmbeddedKafka and Testcontainers — write reliable integration tests using @EmbeddedKafka for fast unit-like tests and KafkaContainer (Testcontainers) for real-broker validation.