Reliable Event Publishing: The Transactional Outbox Pattern

There is a fundamental problem with publishing Kafka events after a database commit: if the application crashes between the commit and the publish, the event is lost forever. The Transactional Outbox Pattern solves this.

The Problem

@Transactional
public Order createOrder(CreateOrderRequest request) {
    Order order = orderRepository.save(buildOrder(request));
    // DB committed ✓

    kafkaTemplate.send("order-events", event);
    // ← Crash here → event lost, DB already committed
    // → Inventory never updated, customer never notified

    return order;
}

Two distributed systems (PostgreSQL and Kafka) can’t be in a single transaction. This is the two-generals problem — you can’t guarantee both succeed atomically.

The Solution: Outbox Table

Instead of publishing directly to Kafka, write the event to a database table (the “outbox”) in the same transaction as your business data. A separate process reads from the outbox and publishes to Kafka.

1. Begin transaction
2. Save Order to orders table
3. Save OrderCreatedEvent to outbox table
4. Commit transaction  ← atomic: both succeed or both fail

5. Outbox relay reads from outbox table
6. Publishes to Kafka
7. Marks outbox record as published

Now the event is guaranteed to exist (step 3 succeeded with the transaction) and will eventually be published (step 5–7 retries until successful).

Implementation

Outbox Table

-- V5__create_outbox_table.sql
CREATE TABLE outbox_events (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type  VARCHAR(100) NOT NULL,    -- 'Order'
    aggregate_id    VARCHAR(100) NOT NULL,    -- order UUID
    event_type      VARCHAR(100) NOT NULL,    -- 'OrderCreated'
    payload         JSONB        NOT NULL,    -- event data
    created_at      TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,              -- null until published
    retry_count     INT          NOT NULL DEFAULT 0
);

CREATE INDEX idx_outbox_unpublished ON outbox_events (created_at)
    WHERE published_at IS NULL;

Outbox Entity

@Entity
@Table(name = "outbox_events")
@Getter
public class OutboxEvent {

    @Id
    @GeneratedValue(strategy = GenerationType.UUID)
    private UUID id;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(columnDefinition = "jsonb", nullable = false)
    private String payload;    // JSON string

    @Column(nullable = false)
    private Instant createdAt = Instant.now();

    private Instant publishedAt;

    private int retryCount;

    public static OutboxEvent from(String aggregateType, String aggregateId,
                                   String eventType, Object event,
                                   ObjectMapper objectMapper) {
        var outbox = new OutboxEvent();
        outbox.aggregateType = aggregateType;
        outbox.aggregateId = aggregateId;
        outbox.eventType = eventType;
        try {
            outbox.payload = objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to serialize event", e);
        }
        return outbox;
    }

    public void markPublished() {
        this.publishedAt = Instant.now();
    }

    public void incrementRetry() {
        this.retryCount++;
    }
}

Writing to Outbox (Same Transaction as Business Logic)

@Service
@RequiredArgsConstructor
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OutboxEventRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public Order createOrder(CreateOrderRequest request) {
        // 1. Save business data
        Order order = orderRepository.save(buildOrder(request));

        // 2. Save event to outbox — same transaction
        OrderCreatedEvent event = OrderCreatedEvent.from(order);
        OutboxEvent outboxEvent = OutboxEvent.from(
            "Order",
            order.getId().toString(),
            "OrderCreated",
            event,
            objectMapper
        );
        outboxRepository.save(outboxEvent);

        // Both committed atomically — no lost events possible
        return order;
    }

    public void confirmOrder(UUID orderId) {
        Order order = orderRepository.findById(orderId).orElseThrow();
        order.confirm();

        OrderConfirmedEvent event = OrderConfirmedEvent.from(order);
        outboxRepository.save(OutboxEvent.from(
            "Order", orderId.toString(), "OrderConfirmed", event, objectMapper));
    }
}

Outbox Relay (Polling Publisher)

@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxEventRelay {

    private final OutboxEventRepository outboxRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ObjectMapper objectMapper;

    @Scheduled(fixedDelay = 1000)     // poll every second
    @Transactional
    public void publishPendingEvents() {
        List<OutboxEvent> pending = outboxRepository
            .findTop100ByPublishedAtIsNullOrderByCreatedAtAsc();

        for (OutboxEvent outboxEvent : pending) {
            try {
                publish(outboxEvent);
                outboxEvent.markPublished();
            } catch (Exception e) {
                outboxEvent.incrementRetry();
                log.error("Failed to publish outbox event: id={}, retries={}",
                    outboxEvent.getId(), outboxEvent.getRetryCount(), e);

                if (outboxEvent.getRetryCount() >= 10) {
                    log.error("Outbox event exceeded max retries, alerting: id={}",
                        outboxEvent.getId());
                    // Alert on-call, move to dead-letter table, etc.
                }
            }
        }
    }

    private void publish(OutboxEvent outboxEvent) throws Exception {
        Object payload = deserializePayload(outboxEvent);

        String topic = resolveTopicName(outboxEvent.getEventType());
        String key = outboxEvent.getAggregateId();

        kafkaTemplate.send(topic, key, payload).get(5, TimeUnit.SECONDS);
    }

    private Object deserializePayload(OutboxEvent outboxEvent) throws JsonProcessingException {
        return switch (outboxEvent.getEventType()) {
            case "OrderCreated"   -> objectMapper.readValue(outboxEvent.getPayload(), OrderCreatedEvent.class);
            case "OrderConfirmed" -> objectMapper.readValue(outboxEvent.getPayload(), OrderConfirmedEvent.class);
            case "OrderShipped"   -> objectMapper.readValue(outboxEvent.getPayload(), OrderShippedEvent.class);
            default -> throw new IllegalArgumentException("Unknown event type: " + outboxEvent.getEventType());
        };
    }

    private String resolveTopicName(String eventType) {
        return switch (eventType) {
            case "OrderCreated", "OrderConfirmed", "OrderShipped", "OrderCancelled" -> "order-events";
            case "PaymentProcessed", "PaymentFailed" -> "payment-events";
            default -> throw new IllegalArgumentException("No topic for event: " + eventType);
        };
    }
}
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, UUID> {

    List<OutboxEvent> findTop100ByPublishedAtIsNullOrderByCreatedAtAsc();

    @Modifying
    @Query("DELETE FROM OutboxEvent o WHERE o.publishedAt < :cutoff")
    void deletePublishedBefore(@Param("cutoff") Instant cutoff);
}

Outbox Cleanup

@Scheduled(cron = "0 0 3 * * *")   // 3am daily
@Transactional
public void cleanupPublishedEvents() {
    Instant cutoff = Instant.now().minus(Duration.ofDays(7));
    outboxRepository.deletePublishedBefore(cutoff);
    log.info("Cleaned up published outbox events older than 7 days");
}

Optimistic Locking to Prevent Duplicate Publishing

If you run multiple application instances, multiple relays may pick up the same event:

@Entity
@Table(name = "outbox_events")
public class OutboxEvent {

    @Version
    private Long version;   // optimistic lock — prevents concurrent publishing

    // ...
}

With @Version, two relay threads trying to update the same row — one wins, the other gets OptimisticLockingFailureException and skips that event.

Or use SELECT FOR UPDATE SKIP LOCKED (PostgreSQL-specific):

@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, UUID> {

    @Query(value = """
        SELECT * FROM outbox_events
        WHERE published_at IS NULL
        ORDER BY created_at
        LIMIT 100
        FOR UPDATE SKIP LOCKED
        """, nativeQuery = true)
    List<OutboxEvent> findAndLockPending();
}

SKIP LOCKED — if another instance has locked a row, this query skips it entirely. Each relay instance processes different rows. No duplicate publishing, no contention.

CDC: Debezium (Production-Grade)

Polling the outbox table has a latency of up to 1 second (poll interval). For lower latency, use Change Data Capture (CDC) with Debezium.

Debezium reads PostgreSQL’s WAL (write-ahead log) and streams every change to Kafka — including outbox inserts. Events appear in Kafka within milliseconds of the database commit.

# Debezium connector config (deployed separately)
name: order-service-outbox-connector
config:
  connector.class: io.debezium.connector.postgresql.PostgresConnector
  database.hostname: postgres
  database.port: 5432
  database.user: debezium
  database.password: ${DEBEZIUM_PASSWORD}
  database.dbname: orders
  table.include.list: public.outbox_events
  transforms: outbox
  transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
  transforms.outbox.table.field.event.key: aggregate_id
  transforms.outbox.route.topic.replacement: ${routedByValue}.events

Debezium’s EventRouter transform reads aggregate_type and event_type from each row and routes to the correct Kafka topic automatically.

With CDC, your application doesn’t need a polling scheduler at all — Debezium handles the relay.

What You’ve Learned

  • Direct Kafka publishing after a DB commit risks lost events if the application crashes between the two operations
  • The Outbox Pattern writes events to a database table in the same transaction — atomicity guaranteed
  • A scheduler (relay) polls the outbox and publishes to Kafka — retries until successful
  • SELECT FOR UPDATE SKIP LOCKED prevents multiple relay instances from publishing the same event
  • Debezium CDC reads the WAL stream for sub-millisecond latency and eliminates the polling scheduler

Next: Article 45 — Building a Modular Monolith with Spring Modulith — structure your application to enforce module boundaries without microservices.