Kafka Transactions and Exactly-Once Semantics

Why Transactions?

At-least-once delivery means a record can be processed and produced more than once after a crash. For most applications, idempotent consumers handle this. But when you need a hard guarantee — either the produce happens and the offset commits, or neither does — you need Kafka transactions.

Common scenarios:

  • Consume → transform → produce (read from one topic, write to another) where partial completion is unacceptable
  • Exactly-once aggregations in financial or billing systems
  • Atomic multi-topic produce where records to multiple topics must all land or none land

How Kafka Transactions Work

sequenceDiagram
    participant Producer
    participant Broker
    participant Consumer

    Producer->>Broker: initTransactions() [registers transactional.id]
    Producer->>Broker: beginTransaction()
    Producer->>Broker: send(orders-confirmed, record1)
    Producer->>Broker: send(inventory-reserved, record2)
    Producer->>Broker: sendOffsetsToTransaction([orders-0: offset 42], groupId)
    Producer->>Broker: commitTransaction()
    Note over Broker: records visible atomically\noffset 42 committed atomically

    alt Crash before commitTransaction
        Producer->>Broker: abortTransaction() [or broker aborts on timeout]
        Note over Broker: records and offset commit\nrolled back atomically
    end

sendOffsetsToTransaction is the key call — it commits the consumer offset as part of the producer transaction, giving true exactly-once consume-transform-produce semantics.


Producer Configuration

@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    // Required for transactions
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-tx-");
    // Idempotence is required for transactions
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    // acks=all is required for idempotence
    props.put(ProducerConfig.ACKS_CONFIG, "all");

    DefaultKafkaProducerFactory<String, Object> factory =
        new DefaultKafkaProducerFactory<>(props);
    return factory;
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

transactional.id uniquely identifies the producer across restarts. If the service restarts with the same transactional.id, Kafka fences the previous producer instance and resolves any in-flight transactions. Append a suffix like order-service-tx- — Spring Kafka appends a unique suffix per producer instance.


KafkaTransactionManager

@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
        ProducerFactory<String, Object> producerFactory) {
    return new KafkaTransactionManager<>(producerFactory);
}

This integrates with Spring’s @Transactional infrastructure — annotate any method with @Transactional("kafkaTransactionManager") and all KafkaTemplate.send() calls inside are wrapped in a transaction.


Consumer Isolation Level

Set the consumer to read only committed records:

props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

With read_committed, the consumer sees records from transactional producers only after the transaction is committed. Aborted records are invisible.

flowchart LR
    subgraph Broker["Topic Partition"]
        R40["offset 40 (committed)"]
        R41["offset 41 (in-flight transaction)"]
        R42["offset 42 (committed)"]
        R43["offset 43 (aborted)"]
    end

    subgraph Consumers
        RC["read_committed consumer\nsees: 40, 42 (skips 41 until commit, skips 43)"]
        RU["read_uncommitted consumer\nsees: 40, 41, 42, 43"]
    end

    Broker --> RC
    Broker --> RU

Consume-Transform-Produce Pattern

The most common transactional pattern: consume from orders, process, produce to orders-confirmed, commit offset — all atomically.

@Service
@RequiredArgsConstructor
public class OrderConfirmationService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional("kafkaTransactionManager")
    public void confirmOrder(ConsumerRecord<String, OrderPlacedEvent> record) {
        OrderPlacedEvent placed = record.value();

        // Business logic
        inventoryService.reserveStock(placed);

        // Produce to output topic — inside the transaction
        OrderConfirmedEvent confirmed = new OrderConfirmedEvent(
            placed.getOrderId(),
            placed.getCustomerId(),
            Instant.now().toEpochMilli()
        );
        kafkaTemplate.send("orders-confirmed", placed.getOrderId(), confirmed);

        // Offset is committed as part of the transaction — not separately
    }
}

Wire this into the listener container with EOSMode enabled:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
        transactionalListenerContainerFactory(
                ConsumerFactory<String, Object> consumerFactory,
                KafkaTransactionManager<String, Object> txManager) {

    var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
    factory.setConsumerFactory(consumerFactory);
    factory.getContainerProperties().setTransactionManager(txManager);
    return factory;
}

When a KafkaTransactionManager is set on the container, Spring Kafka:

  1. Begins a transaction before calling the listener
  2. Sends the consumer offset to the transaction via sendOffsetsToTransaction
  3. Commits the transaction after the listener returns successfully
  4. Aborts on exception — offset is not committed, record will be redelivered

Exactly-Once with @Transactional

@KafkaListener(
    topics = "orders",
    groupId = "order-confirmation-service",
    containerFactory = "transactionalListenerContainerFactory"
)
@Transactional("kafkaTransactionManager")
public void onOrder(ConsumerRecord<String, OrderPlacedEvent> record) {
    OrderPlacedEvent event = record.value();

    // Both sends and offset commit happen atomically
    kafkaTemplate.send("orders-confirmed", event.getOrderId(),
        new OrderConfirmedEvent(event.getOrderId()));
    kafkaTemplate.send("inventory-events", event.getOrderId(),
        new StockReservedEvent(event.getOrderId(), event.getItems()));
}

If either send fails or the method throws, the transaction aborts — no records land in orders-confirmed or inventory-events, and the offset on orders is not committed.


Transactional ID Fencing

If an application instance crashes mid-transaction and restarts, Kafka uses the transactional.id to fence the zombie:

sequenceDiagram
    participant OldProducer as "Old Producer (epoch=5)"
    participant NewProducer as "New Producer (epoch=6)"
    participant Broker

    OldProducer->>Broker: beginTransaction()
    OldProducer->>Broker: send(record1)
    Note over OldProducer: CRASH 💥
    NewProducer->>Broker: initTransactions() [same transactional.id]
    Broker->>Broker: bump epoch to 6, fence epoch 5
    NewProducer->>Broker: beginTransaction() [epoch=6]
    OldProducer->>Broker: commitTransaction() [epoch=5 — REJECTED]
    Note over Broker: Old transaction aborted\nNew transaction can proceed

EOS Modes

Spring Kafka supports two EOS modes:

ModeSettingBehaviour
V1 (Alpha)EOSMode.V1Uses sendOffsetsToTransaction on every record
V2 (Beta, default)EOSMode.V2More efficient; uses one transaction per poll batch

V2 is the default since Spring Kafka 2.6 and works with Kafka 2.5+. Use V1 only if running an older broker.


When NOT to Use Transactions

  • Simple consumers with idempotent processing — at-least-once + idempotent logic is simpler and faster
  • High-throughput fire-and-forget topics — transactions add latency (~30–50ms per commit round trip)
  • Multiple consumer group members writing to different partitions — transactions work per producer instance; ensure each instance has a unique transactional.id

Key Takeaways

  • Kafka transactions make produce + offset commit atomic — either both happen or neither does
  • Set transactional.id, enable.idempotence=true, and acks=all on the producer — they are all required together
  • Set isolation.level=read_committed on consumers to skip uncommitted and aborted records
  • Wire KafkaTransactionManager into the listener container factory to enable EOS consume-transform-produce
  • Kafka fences zombie producers with the same transactional.id using epoch numbers
  • Use transactions only where exactly-once is genuinely needed — at-least-once + idempotency is sufficient for most applications

Next: @SendTo and @KafkaHandler: Chaining Consumers and Multi-Type Dispatch — forward listener output to another topic with @SendTo and dispatch multiple event types to separate handler methods with @KafkaHandler.