Reliable Event Publishing: The Transactional Outbox Pattern
There is a fundamental problem with publishing Kafka events after a database commit: if the application crashes between the commit and the publish, the event is lost forever. The Transactional Outbox Pattern solves this.
The Problem
@Transactional
public Order createOrder(CreateOrderRequest request) {
Order order = orderRepository.save(buildOrder(request));
// DB committed ✓
kafkaTemplate.send("order-events", event);
// ← Crash here → event lost, DB already committed
// → Inventory never updated, customer never notified
return order;
}
Two distributed systems (PostgreSQL and Kafka) can’t be in a single transaction. This is the two-generals problem — you can’t guarantee both succeed atomically.
The Solution: Outbox Table
Instead of publishing directly to Kafka, write the event to a database table (the “outbox”) in the same transaction as your business data. A separate process reads from the outbox and publishes to Kafka.
1. Begin transaction
2. Save Order to orders table
3. Save OrderCreatedEvent to outbox table
4. Commit transaction ← atomic: both succeed or both fail
5. Outbox relay reads from outbox table
6. Publishes to Kafka
7. Marks outbox record as published
Now the event is guaranteed to exist (step 3 succeeded with the transaction) and will eventually be published (step 5–7 retries until successful).
Implementation
Outbox Table
-- V5__create_outbox_table.sql
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL, -- 'Order'
aggregate_id VARCHAR(100) NOT NULL, -- order UUID
event_type VARCHAR(100) NOT NULL, -- 'OrderCreated'
payload JSONB NOT NULL, -- event data
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ, -- null until published
retry_count INT NOT NULL DEFAULT 0
);
CREATE INDEX idx_outbox_unpublished ON outbox_events (created_at)
WHERE published_at IS NULL;
Outbox Entity
@Entity
@Table(name = "outbox_events")
@Getter
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "jsonb", nullable = false)
private String payload; // JSON string
@Column(nullable = false)
private Instant createdAt = Instant.now();
private Instant publishedAt;
private int retryCount;
public static OutboxEvent from(String aggregateType, String aggregateId,
String eventType, Object event,
ObjectMapper objectMapper) {
var outbox = new OutboxEvent();
outbox.aggregateType = aggregateType;
outbox.aggregateId = aggregateId;
outbox.eventType = eventType;
try {
outbox.payload = objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize event", e);
}
return outbox;
}
public void markPublished() {
this.publishedAt = Instant.now();
}
public void incrementRetry() {
this.retryCount++;
}
}
Writing to Outbox (Same Transaction as Business Logic)
@Service
@RequiredArgsConstructor
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxRepository;
private final ObjectMapper objectMapper;
public Order createOrder(CreateOrderRequest request) {
// 1. Save business data
Order order = orderRepository.save(buildOrder(request));
// 2. Save event to outbox — same transaction
OrderCreatedEvent event = OrderCreatedEvent.from(order);
OutboxEvent outboxEvent = OutboxEvent.from(
"Order",
order.getId().toString(),
"OrderCreated",
event,
objectMapper
);
outboxRepository.save(outboxEvent);
// Both committed atomically — no lost events possible
return order;
}
public void confirmOrder(UUID orderId) {
Order order = orderRepository.findById(orderId).orElseThrow();
order.confirm();
OrderConfirmedEvent event = OrderConfirmedEvent.from(order);
outboxRepository.save(OutboxEvent.from(
"Order", orderId.toString(), "OrderConfirmed", event, objectMapper));
}
}
Outbox Relay (Polling Publisher)
@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxEventRelay {
private final OutboxEventRepository outboxRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@Scheduled(fixedDelay = 1000) // poll every second
@Transactional
public void publishPendingEvents() {
List<OutboxEvent> pending = outboxRepository
.findTop100ByPublishedAtIsNullOrderByCreatedAtAsc();
for (OutboxEvent outboxEvent : pending) {
try {
publish(outboxEvent);
outboxEvent.markPublished();
} catch (Exception e) {
outboxEvent.incrementRetry();
log.error("Failed to publish outbox event: id={}, retries={}",
outboxEvent.getId(), outboxEvent.getRetryCount(), e);
if (outboxEvent.getRetryCount() >= 10) {
log.error("Outbox event exceeded max retries, alerting: id={}",
outboxEvent.getId());
// Alert on-call, move to dead-letter table, etc.
}
}
}
}
private void publish(OutboxEvent outboxEvent) throws Exception {
Object payload = deserializePayload(outboxEvent);
String topic = resolveTopicName(outboxEvent.getEventType());
String key = outboxEvent.getAggregateId();
kafkaTemplate.send(topic, key, payload).get(5, TimeUnit.SECONDS);
}
private Object deserializePayload(OutboxEvent outboxEvent) throws JsonProcessingException {
return switch (outboxEvent.getEventType()) {
case "OrderCreated" -> objectMapper.readValue(outboxEvent.getPayload(), OrderCreatedEvent.class);
case "OrderConfirmed" -> objectMapper.readValue(outboxEvent.getPayload(), OrderConfirmedEvent.class);
case "OrderShipped" -> objectMapper.readValue(outboxEvent.getPayload(), OrderShippedEvent.class);
default -> throw new IllegalArgumentException("Unknown event type: " + outboxEvent.getEventType());
};
}
private String resolveTopicName(String eventType) {
return switch (eventType) {
case "OrderCreated", "OrderConfirmed", "OrderShipped", "OrderCancelled" -> "order-events";
case "PaymentProcessed", "PaymentFailed" -> "payment-events";
default -> throw new IllegalArgumentException("No topic for event: " + eventType);
};
}
}
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, UUID> {
List<OutboxEvent> findTop100ByPublishedAtIsNullOrderByCreatedAtAsc();
@Modifying
@Query("DELETE FROM OutboxEvent o WHERE o.publishedAt < :cutoff")
void deletePublishedBefore(@Param("cutoff") Instant cutoff);
}
Outbox Cleanup
@Scheduled(cron = "0 0 3 * * *") // 3am daily
@Transactional
public void cleanupPublishedEvents() {
Instant cutoff = Instant.now().minus(Duration.ofDays(7));
outboxRepository.deletePublishedBefore(cutoff);
log.info("Cleaned up published outbox events older than 7 days");
}
Optimistic Locking to Prevent Duplicate Publishing
If you run multiple application instances, multiple relays may pick up the same event:
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Version
private Long version; // optimistic lock — prevents concurrent publishing
// ...
}
With @Version, two relay threads trying to update the same row — one wins, the other gets OptimisticLockingFailureException and skips that event.
Or use SELECT FOR UPDATE SKIP LOCKED (PostgreSQL-specific):
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, UUID> {
@Query(value = """
SELECT * FROM outbox_events
WHERE published_at IS NULL
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""", nativeQuery = true)
List<OutboxEvent> findAndLockPending();
}
SKIP LOCKED — if another instance has locked a row, this query skips it entirely. Each relay instance processes different rows. No duplicate publishing, no contention.
CDC: Debezium (Production-Grade)
Polling the outbox table has a latency of up to 1 second (poll interval). For lower latency, use Change Data Capture (CDC) with Debezium.
Debezium reads PostgreSQL’s WAL (write-ahead log) and streams every change to Kafka — including outbox inserts. Events appear in Kafka within milliseconds of the database commit.
# Debezium connector config (deployed separately)
name: order-service-outbox-connector
config:
connector.class: io.debezium.connector.postgresql.PostgresConnector
database.hostname: postgres
database.port: 5432
database.user: debezium
database.password: ${DEBEZIUM_PASSWORD}
database.dbname: orders
table.include.list: public.outbox_events
transforms: outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.field.event.key: aggregate_id
transforms.outbox.route.topic.replacement: ${routedByValue}.events
Debezium’s EventRouter transform reads aggregate_type and event_type from each row and routes to the correct Kafka topic automatically.
With CDC, your application doesn’t need a polling scheduler at all — Debezium handles the relay.
What You’ve Learned
- Direct Kafka publishing after a DB commit risks lost events if the application crashes between the two operations
- The Outbox Pattern writes events to a database table in the same transaction — atomicity guaranteed
- A scheduler (relay) polls the outbox and publishes to Kafka — retries until successful
SELECT FOR UPDATE SKIP LOCKEDprevents multiple relay instances from publishing the same event- Debezium CDC reads the WAL stream for sub-millisecond latency and eliminates the polling scheduler
Next: Article 45 — Building a Modular Monolith with Spring Modulith — structure your application to enforce module boundaries without microservices.