Testing Kafka Applications: EmbeddedKafka and Testcontainers

Two Testing Strategies

StrategySpeedFidelityUse when
@EmbeddedKafkaFast (~2s startup)In-process broker, not 100% identicalUnit/integration tests — CI fast path
KafkaContainer (Testcontainers)Slower (~10s startup)Real Kafka broker in DockerAcceptance tests, DLT/transaction validation

Use both: @EmbeddedKafka for the bulk of tests, KafkaContainer for the smoke suite that validates real-broker behaviour.


Test Dependencies

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>

@EmbeddedKafka — Fast Integration Tests

Testing a Producer

@SpringBootTest
@EmbeddedKafka(
    partitions = 1,
    topics = {"orders"},
    brokerProperties = {"log.dir=target/kafka-test-logs"}
)
class OrderPublisherTest {

    @Autowired
    private OrderEventPublisher publisher;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    void shouldPublishOrderEvent() throws Exception {
        // Set up a consumer to read what was published
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
            "test-group", "true", embeddedKafka
        );
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "orders");

        // Publish
        OrderPlacedEvent event = new OrderPlacedEvent(
            UUID.randomUUID().toString(), "customer-1", 149.99, Instant.now().toEpochMilli()
        );
        publisher.publish(event);

        // Assert
        ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "orders");
        assertThat(record.value()).contains(event.getOrderId());
    }
}

Testing a Consumer (Listener)

@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = {"orders", "inventory-events"}
)
class InventoryEventListenerTest {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

    @MockBean
    private InventoryService inventoryService;

    @Test
    void shouldCallInventoryServiceOnOrderEvent() {
        OrderPlacedEvent event = new OrderPlacedEvent(
            "order-123", "customer-1", 99.99, Instant.now().toEpochMilli()
        );

        kafkaTemplate.send("orders", event.getOrderId(), event);

        // Wait for async listener to process
        await()
            .atMost(Duration.ofSeconds(10))
            .untilAsserted(() ->
                verify(inventoryService, times(1)).reserveStock(any(OrderPlacedEvent.class))
            );
    }
}

Overriding Bootstrap Servers

Use @TestPropertySource to point Spring Boot at the embedded broker:

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = "orders")
@TestPropertySource(properties = {
    "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"
})
class OrderListenerTest {
    // spring.embedded.kafka.brokers is set automatically by @EmbeddedKafka
}

CountDownLatch for Listener Synchronization

When you can’t mock the service (e.g., testing the full flow), use a CountDownLatch:

@SpringBootTest
@EmbeddedKafka(topics = "orders")
@TestPropertySource(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
class FullFlowTest {

    @Autowired
    private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

    @Autowired
    private TestListener testListener;

    @Test
    void shouldProcessOrderEndToEnd() throws Exception {
        kafkaTemplate.send("orders", "order-1",
            new OrderPlacedEvent("order-1", "customer-1", 50.0, Instant.now().toEpochMilli())
        );

        boolean received = testListener.latch.await(10, TimeUnit.SECONDS);
        assertThat(received).isTrue();
        assertThat(testListener.lastOrderId).isEqualTo("order-1");
    }
}

@Component
class TestListener {
    final CountDownLatch latch = new CountDownLatch(1);
    volatile String lastOrderId;

    @KafkaListener(topics = "orders", groupId = "test-group")
    public void onOrder(OrderPlacedEvent event) {
        lastOrderId = event.getOrderId();
        latch.countDown();
    }
}

Testcontainers — Real Kafka Broker

@SpringBootTest
@Testcontainers
class OrderServiceIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("confluentinc/cp-kafka:7.6.0")
    );

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }

    @Autowired
    private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

    @Autowired
    private InventoryService inventoryService;

    @Test
    void shouldProcessOrderWithRealKafka() {
        OrderPlacedEvent event = new OrderPlacedEvent(
            "order-42", "customer-7", 299.99, Instant.now().toEpochMilli()
        );
        kafkaTemplate.send("orders", event.getOrderId(), event);

        await()
            .atMost(Duration.ofSeconds(30))
            .untilAsserted(() ->
                assertThat(inventoryService.isReserved("order-42")).isTrue()
            );
    }
}

@DynamicPropertySource injects the container’s bootstrap servers before the Spring context starts — no hardcoded ports.


Testing Dead Letter Topics

Verify that a non-retryable exception routes the record to the DLT:

@SpringBootTest
@EmbeddedKafka(topics = {"orders", "orders.DLT"})
@TestPropertySource(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
class DeadLetterTest {

    @Autowired
    private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @MockBean
    private InventoryService inventoryService;

    @Test
    void shouldRouteToDetOnNonRetryableException() throws Exception {
        // Make the inventory service throw a non-retryable exception
        doThrow(new OrderValidationException("Invalid order"))
            .when(inventoryService).reserveStock(any());

        kafkaTemplate.send("orders", "order-bad",
            new OrderPlacedEvent("order-bad", "customer-1", 0.0, Instant.now().toEpochMilli())
        );

        // Read from DLT
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
            "dlt-test-group", "true", embeddedKafka);
        KafkaConsumer<String, String> dltConsumer = new KafkaConsumer<>(consumerProps);
        embeddedKafka.consumeFromAnEmbeddedTopic(dltConsumer, "orders.DLT");

        ConsumerRecord<String, String> dltRecord =
            KafkaTestUtils.getSingleRecord(dltConsumer, "orders.DLT",
                Duration.ofSeconds(10).toMillis());

        assertThat(dltRecord).isNotNull();
        assertThat(dltRecord.value()).contains("order-bad");
    }
}

Testing Kafka Transactions

@SpringBootTest
@EmbeddedKafka(topics = {"orders", "orders-confirmed"})
@TestPropertySource(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
class TransactionTest {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    void shouldRollbackOnException() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
            "tx-test-group", "false", embeddedKafka);
        consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "orders-confirmed");

        // Simulated rollback — no records should reach orders-confirmed
        assertThatThrownBy(() ->
            kafkaTemplate.executeInTransaction(ops -> {
                ops.send("orders-confirmed", "order-1", "{}");
                throw new RuntimeException("Simulated failure");
            })
        ).isInstanceOf(RuntimeException.class);

        // With read_committed, rolled-back records are invisible
        ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofSeconds(3));
        assertThat(records.count()).isZero();
    }
}

Test Utilities Reference

UtilityPurpose
KafkaTestUtils.consumerProps(group, autoCommit, broker)Build consumer properties pointing at embedded broker
KafkaTestUtils.getSingleRecord(consumer, topic)Poll until exactly one record arrives (throws on timeout)
KafkaTestUtils.getRecords(consumer)Poll and return all available records
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic)Subscribe consumer to embedded topic
Awaitility.await().untilAsserted(...)Wait for async assertion to pass

Key Takeaways

  • Use @EmbeddedKafka for fast CI tests — it starts in seconds and requires no Docker
  • Point Spring Boot at the embedded broker with ${spring.embedded.kafka.brokers} in @TestPropertySource
  • Use Awaitility for async listener assertions — CountDownLatch works but Awaitility produces cleaner failure messages
  • Use KafkaContainer (Testcontainers) for smoke tests that need real-broker behaviour: DLT headers, transaction isolation, and broker-level configuration
  • @DynamicPropertySource connects Testcontainers’ dynamic port to Spring’s bootstrap servers config
  • Test both the happy path and error paths (DLT routing, non-retryable exceptions) — these are the paths most likely to fail silently in production

Next: Monitoring: Consumer Lag, Micrometer Metrics, and Actuator Integration — expose Kafka consumer lag and throughput metrics via Micrometer to Prometheus and Grafana.