Event-Driven Spring Boot: Transactional Outbox Pattern with Kafka
Publishing an event to Kafka after saving to the database looks simple. It has a subtle, dangerous flaw: if the Kafka publish fails after the DB commit, or the app crashes between the two, your event is lost and your data is inconsistent.
The Transactional Outbox Pattern solves this by writing the event to the database in the same transaction as the business data, then publishing to Kafka separately. This guide covers the pattern, the implementation, and idempotent consumers.
The Problem
// DANGEROUS — two separate operations, no atomicity
@Transactional
public Order createOrder(CreateOrderRequest request) {
Order order = orderRepository.save(new Order(request)); // DB write
kafkaTemplate.send("orders", new OrderCreatedEvent(order.getId())); // Kafka publish
// If this fails or app crashes, order exists in DB but event was never published
// Downstream services never know the order was created
return order;
}
Even wrapping this in @Transactional doesn’t help — Kafka isn’t part of the database transaction.
The Transactional Outbox Pattern
Instead of publishing directly to Kafka, write the event to an outbox table in the same database transaction as the order. A separate publisher process reads from the outbox and publishes to Kafka. The database transaction guarantees both writes succeed or both roll back.
┌─────── Single DB Transaction ────────┐
│ │
Application ───────►│ INSERT INTO orders (...) │
│ INSERT INTO outbox (event_type, ...) │
│ │
└───────────────────────────────────────┘
│
│ (separate process)
▼
Outbox Publisher reads unpublished events
│
▼
Kafka Topic
│
▼
Consumers process event
Implementation: Spring Boot + Spring Data
Outbox table schema
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL, -- e.g., "Order"
aggregate_id VARCHAR(255) NOT NULL, -- e.g., order ID
event_type VARCHAR(255) NOT NULL, -- e.g., "OrderCreated"
payload JSONB NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'PENDING',
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
published_at TIMESTAMP
);
CREATE INDEX idx_outbox_status ON outbox_events (status, created_at);
Outbox entity and repository
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue
private UUID id;
private String aggregateType;
private String aggregateId;
private String eventType;
@JdbcTypeCode(SqlTypes.JSON)
private String payload;
@Enumerated(EnumType.STRING)
private Status status = Status.PENDING;
private LocalDateTime createdAt;
private LocalDateTime publishedAt;
public enum Status { PENDING, PUBLISHED, FAILED }
public static OutboxEvent of(String aggregateType, String aggregateId,
String eventType, Object payload) {
try {
OutboxEvent event = new OutboxEvent();
event.aggregateType = aggregateType;
event.aggregateId = aggregateId;
event.eventType = eventType;
event.payload = new ObjectMapper().writeValueAsString(payload);
event.createdAt = LocalDateTime.now();
return event;
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize event payload", e);
}
}
}
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, UUID> {
@Lock(LockModeType.PESSIMISTIC_WRITE)
List<OutboxEvent> findTop100ByStatusOrderByCreatedAt(OutboxEvent.Status status);
}
Saving to outbox in the same transaction
@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxRepository;
public Order createOrder(CreateOrderRequest request) {
Order order = orderRepository.save(new Order(request));
// Write event to outbox IN THE SAME TRANSACTION
outboxRepository.save(OutboxEvent.of(
"Order",
order.getId().toString(),
"OrderCreated",
new OrderCreatedPayload(order.getId(), order.getCustomerId(), order.getTotal())
));
return order;
}
}
If the transaction rolls back, both the order and the outbox entry roll back. No orphaned events.
Outbox publisher (polling)
@Component
public class OutboxPublisher {
private final OutboxEventRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(fixedDelay = 1000) // poll every second
@Transactional
public void publishPendingEvents() {
List<OutboxEvent> pending = outboxRepository
.findTop100ByStatusOrderByCreatedAt(OutboxEvent.Status.PENDING);
for (OutboxEvent event : pending) {
try {
kafkaTemplate.send(
topicFor(event.getEventType()),
event.getAggregateId(), // use as Kafka partition key
event.getPayload()
).get(5, TimeUnit.SECONDS); // wait for ack
event.setStatus(OutboxEvent.Status.PUBLISHED);
event.setPublishedAt(LocalDateTime.now());
} catch (Exception e) {
event.setStatus(OutboxEvent.Status.FAILED);
log.error("Failed to publish outbox event {}: {}", event.getId(), e.getMessage());
}
}
}
private String topicFor(String eventType) {
return switch (eventType) {
case "OrderCreated" -> "orders.created";
case "OrderCancelled" -> "orders.cancelled";
default -> "events." + eventType.toLowerCase();
};
}
}
Kafka Setup
Dependencies
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Configuration
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all # wait for all replicas to ack
retries: 3
properties:
enable.idempotence: true # exactly-once producer semantics
consumer:
group-id: order-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false # manual commit after processing
Idempotent Consumers
Kafka guarantees at-least-once delivery. Your consumer must handle duplicate messages without corrupting data.
@Component
public class OrderCreatedConsumer {
private final InventoryService inventoryService;
private final ProcessedEventRepository processedEvents;
@KafkaListener(topics = "orders.created", groupId = "inventory-service")
public void handleOrderCreated(
@Payload String payload,
@Header(KafkaHeaders.RECEIVED_KEY) String orderId,
Acknowledgment acknowledgment) {
OrderCreatedEvent event = deserialize(payload, OrderCreatedEvent.class);
// Idempotency check — have we processed this event before?
if (processedEvents.existsByEventId(event.getEventId())) {
log.info("Duplicate event ignored: {}", event.getEventId());
acknowledgment.acknowledge();
return;
}
// Process the event
inventoryService.reserveItems(event.getOrderId(), event.getItems());
// Mark as processed in the same transaction
processedEvents.save(new ProcessedEvent(event.getEventId(), Instant.now()));
// Manually acknowledge — Kafka won't commit offset until here
acknowledgment.acknowledge();
}
}
@Entity
@Table(name = "processed_events")
public class ProcessedEvent {
@Id
private UUID eventId;
private Instant processedAt;
}
The processed_events table check is the idempotency key. If the consumer crashes after processing but before committing the offset, Kafka re-delivers the message. The second attempt finds the event in processed_events and skips it.
CDC with Debezium (Alternative to Polling)
The polling outbox publisher works but adds latency (up to 1 second) and DB load. Debezium reads PostgreSQL’s write-ahead log (WAL) directly and publishes inserts to the outbox table to Kafka in near real-time:
# Debezium connector configuration (via Kafka Connect)
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.dbname": "ordersdb",
"table.include.list": "public.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"transforms.outbox.table.field.event.key": "aggregate_id"
}
}
With Debezium:
- Zero polling latency — events published in < 100ms
- No scheduled job needed
- Outbox table acts as a durable buffer even if Kafka is down
- CDC handles all INSERT/UPDATE/DELETE from the WAL
Tradeoff: Debezium adds operational complexity (Kafka Connect cluster). Use polling for simpler setups; migrate to Debezium when latency or DB poll load becomes an issue.
Kafka Consumer Error Handling
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// Dead letter topic after 3 retries
factory.setCommonErrorHandler(
new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + ".dlq", record.partition())),
new FixedBackOff(1000L, 3L) // retry 3 times with 1s delay
)
);
return factory;
}
Failed messages go to orders.created.dlq. Monitor the DLQ — messages there represent lost events that need manual investigation.
Testing with Testcontainers
@SpringBootTest
@Testcontainers
class OrderCreatedConsumerTest extends AbstractIntegrationTest {
@Container
@ServiceConnection
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.6.0")
);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private InventoryRepository inventoryRepository;
@Test
void shouldReserveInventoryOnOrderCreated() throws Exception {
var event = new OrderCreatedEvent(UUID.randomUUID(), List.of(
new OrderItem("PROD-1", 2)
));
kafkaTemplate.send("orders.created", event.getOrderId().toString(),
objectMapper.writeValueAsString(event));
// Wait for consumer to process
await().atMost(10, SECONDS)
.until(() -> inventoryRepository.findReservation(event.getOrderId()).isPresent());
var reservation = inventoryRepository.findReservation(event.getOrderId()).get();
assertThat(reservation.getItems()).hasSize(1);
}
}
Quick Reference
// Save order + outbox event in one transaction
@Transactional
public Order createOrder(CreateOrderRequest req) {
Order order = orderRepository.save(new Order(req));
outboxRepository.save(OutboxEvent.of("Order", order.getId(), "OrderCreated", payload));
return order;
}
// Idempotent consumer
@KafkaListener(topics = "orders.created")
public void handle(String payload, Acknowledgment ack) {
if (processedEvents.existsByEventId(eventId)) { ack.acknowledge(); return; }
// process...
processedEvents.save(new ProcessedEvent(eventId));
ack.acknowledge();
}
Summary
The transactional outbox pattern solves the dual-write problem: write the event to the database in the same transaction as the business data, then publish to Kafka separately. This guarantees no events are lost even if the app crashes. Use a polling publisher for simplicity; upgrade to Debezium CDC for sub-100ms latency at scale. Make consumers idempotent with a processed_events table and manual offset commit — Kafka’s at-least-once guarantee means your consumers will receive duplicates, and that must be safe.
