Part 35 of 37
Testing Kafka Applications: EmbeddedKafka and Testcontainers
Two Testing Strategies
| Strategy | Speed | Fidelity | Use when |
|---|---|---|---|
@EmbeddedKafka | Fast (~2s startup) | In-process broker, not 100% identical | Unit/integration tests — CI fast path |
KafkaContainer (Testcontainers) | Slower (~10s startup) | Real Kafka broker in Docker | Acceptance tests, DLT/transaction validation |
Use both: @EmbeddedKafka for the bulk of tests, KafkaContainer for the smoke suite that validates real-broker behaviour.
Test Dependencies
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
@EmbeddedKafka — Fast Integration Tests
Testing a Producer
@SpringBootTest
@EmbeddedKafka(
partitions = 1,
topics = {"orders"},
brokerProperties = {"log.dir=target/kafka-test-logs"}
)
class OrderPublisherTest {
@Autowired
private OrderEventPublisher publisher;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void shouldPublishOrderEvent() throws Exception {
// Set up a consumer to read what was published
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"test-group", "true", embeddedKafka
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "orders");
// Publish
OrderPlacedEvent event = new OrderPlacedEvent(
UUID.randomUUID().toString(), "customer-1", 149.99, Instant.now().toEpochMilli()
);
publisher.publish(event);
// Assert
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "orders");
assertThat(record.value()).contains(event.getOrderId());
}
}
Testing a Consumer (Listener)
@SpringBootTest
@EmbeddedKafka(
partitions = 3,
topics = {"orders", "inventory-events"}
)
class InventoryEventListenerTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
@MockBean
private InventoryService inventoryService;
@Test
void shouldCallInventoryServiceOnOrderEvent() {
OrderPlacedEvent event = new OrderPlacedEvent(
"order-123", "customer-1", 99.99, Instant.now().toEpochMilli()
);
kafkaTemplate.send("orders", event.getOrderId(), event);
// Wait for async listener to process
await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(() ->
verify(inventoryService, times(1)).reserveStock(any(OrderPlacedEvent.class))
);
}
}
Overriding Bootstrap Servers
Use @TestPropertySource to point Spring Boot at the embedded broker:
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = "orders")
@TestPropertySource(properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"
})
class OrderListenerTest {
// spring.embedded.kafka.brokers is set automatically by @EmbeddedKafka
}
CountDownLatch for Listener Synchronization
When you can’t mock the service (e.g., testing the full flow), use a CountDownLatch:
@SpringBootTest
@EmbeddedKafka(topics = "orders")
@TestPropertySource(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
class FullFlowTest {
@Autowired
private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
@Autowired
private TestListener testListener;
@Test
void shouldProcessOrderEndToEnd() throws Exception {
kafkaTemplate.send("orders", "order-1",
new OrderPlacedEvent("order-1", "customer-1", 50.0, Instant.now().toEpochMilli())
);
boolean received = testListener.latch.await(10, TimeUnit.SECONDS);
assertThat(received).isTrue();
assertThat(testListener.lastOrderId).isEqualTo("order-1");
}
}
@Component
class TestListener {
final CountDownLatch latch = new CountDownLatch(1);
volatile String lastOrderId;
@KafkaListener(topics = "orders", groupId = "test-group")
public void onOrder(OrderPlacedEvent event) {
lastOrderId = event.getOrderId();
latch.countDown();
}
}
Testcontainers — Real Kafka Broker
@SpringBootTest
@Testcontainers
class OrderServiceIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.6.0")
);
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Autowired
private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
@Autowired
private InventoryService inventoryService;
@Test
void shouldProcessOrderWithRealKafka() {
OrderPlacedEvent event = new OrderPlacedEvent(
"order-42", "customer-7", 299.99, Instant.now().toEpochMilli()
);
kafkaTemplate.send("orders", event.getOrderId(), event);
await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(() ->
assertThat(inventoryService.isReserved("order-42")).isTrue()
);
}
}
@DynamicPropertySource injects the container’s bootstrap servers before the Spring context starts — no hardcoded ports.
Testing Dead Letter Topics
Verify that a non-retryable exception routes the record to the DLT:
@SpringBootTest
@EmbeddedKafka(topics = {"orders", "orders.DLT"})
@TestPropertySource(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
class DeadLetterTest {
@Autowired
private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@MockBean
private InventoryService inventoryService;
@Test
void shouldRouteToDetOnNonRetryableException() throws Exception {
// Make the inventory service throw a non-retryable exception
doThrow(new OrderValidationException("Invalid order"))
.when(inventoryService).reserveStock(any());
kafkaTemplate.send("orders", "order-bad",
new OrderPlacedEvent("order-bad", "customer-1", 0.0, Instant.now().toEpochMilli())
);
// Read from DLT
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"dlt-test-group", "true", embeddedKafka);
KafkaConsumer<String, String> dltConsumer = new KafkaConsumer<>(consumerProps);
embeddedKafka.consumeFromAnEmbeddedTopic(dltConsumer, "orders.DLT");
ConsumerRecord<String, String> dltRecord =
KafkaTestUtils.getSingleRecord(dltConsumer, "orders.DLT",
Duration.ofSeconds(10).toMillis());
assertThat(dltRecord).isNotNull();
assertThat(dltRecord.value()).contains("order-bad");
}
}
Testing Kafka Transactions
@SpringBootTest
@EmbeddedKafka(topics = {"orders", "orders-confirmed"})
@TestPropertySource(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
class TransactionTest {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void shouldRollbackOnException() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"tx-test-group", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "orders-confirmed");
// Simulated rollback — no records should reach orders-confirmed
assertThatThrownBy(() ->
kafkaTemplate.executeInTransaction(ops -> {
ops.send("orders-confirmed", "order-1", "{}");
throw new RuntimeException("Simulated failure");
})
).isInstanceOf(RuntimeException.class);
// With read_committed, rolled-back records are invisible
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(3));
assertThat(records.count()).isZero();
}
}
Test Utilities Reference
| Utility | Purpose |
|---|---|
KafkaTestUtils.consumerProps(group, autoCommit, broker) | Build consumer properties pointing at embedded broker |
KafkaTestUtils.getSingleRecord(consumer, topic) | Poll until exactly one record arrives (throws on timeout) |
KafkaTestUtils.getRecords(consumer) | Poll and return all available records |
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic) | Subscribe consumer to embedded topic |
Awaitility.await().untilAsserted(...) | Wait for async assertion to pass |
Key Takeaways
- Use
@EmbeddedKafkafor fast CI tests — it starts in seconds and requires no Docker - Point Spring Boot at the embedded broker with
${spring.embedded.kafka.brokers}in@TestPropertySource - Use
Awaitilityfor async listener assertions —CountDownLatchworks but Awaitility produces cleaner failure messages - Use
KafkaContainer(Testcontainers) for smoke tests that need real-broker behaviour: DLT headers, transaction isolation, and broker-level configuration @DynamicPropertySourceconnects Testcontainers’ dynamic port to Spring’s bootstrap servers config- Test both the happy path and error paths (DLT routing, non-retryable exceptions) — these are the paths most likely to fail silently in production
Next: Monitoring: Consumer Lag, Micrometer Metrics, and Actuator Integration — expose Kafka consumer lag and throughput metrics via Micrometer to Prometheus and Grafana.