Kafka Transactions and Exactly-Once Semantics
Why Transactions?
At-least-once delivery means a record can be processed and produced more than once after a crash. For most applications, idempotent consumers handle this. But when you need a hard guarantee — either the produce happens and the offset commits, or neither does — you need Kafka transactions.
Common scenarios:
- Consume → transform → produce (read from one topic, write to another) where partial completion is unacceptable
- Exactly-once aggregations in financial or billing systems
- Atomic multi-topic produce where records to multiple topics must all land or none land
How Kafka Transactions Work
sequenceDiagram
participant Producer
participant Broker
participant Consumer
Producer->>Broker: initTransactions() [registers transactional.id]
Producer->>Broker: beginTransaction()
Producer->>Broker: send(orders-confirmed, record1)
Producer->>Broker: send(inventory-reserved, record2)
Producer->>Broker: sendOffsetsToTransaction([orders-0: offset 42], groupId)
Producer->>Broker: commitTransaction()
Note over Broker: records visible atomically\noffset 42 committed atomically
alt Crash before commitTransaction
Producer->>Broker: abortTransaction() [or broker aborts on timeout]
Note over Broker: records and offset commit\nrolled back atomically
end
sendOffsetsToTransaction is the key call — it commits the consumer offset as part of the producer transaction, giving true exactly-once consume-transform-produce semantics.
Producer Configuration
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Required for transactions
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-tx-");
// Idempotence is required for transactions
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// acks=all is required for idempotence
props.put(ProducerConfig.ACKS_CONFIG, "all");
DefaultKafkaProducerFactory<String, Object> factory =
new DefaultKafkaProducerFactory<>(props);
return factory;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
transactional.id uniquely identifies the producer across restarts. If the service restarts with the same transactional.id, Kafka fences the previous producer instance and resolves any in-flight transactions. Append a suffix like order-service-tx- — Spring Kafka appends a unique suffix per producer instance.
KafkaTransactionManager
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
This integrates with Spring’s @Transactional infrastructure — annotate any method with @Transactional("kafkaTransactionManager") and all KafkaTemplate.send() calls inside are wrapped in a transaction.
Consumer Isolation Level
Set the consumer to read only committed records:
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
With read_committed, the consumer sees records from transactional producers only after the transaction is committed. Aborted records are invisible.
flowchart LR
subgraph Broker["Topic Partition"]
R40["offset 40 (committed)"]
R41["offset 41 (in-flight transaction)"]
R42["offset 42 (committed)"]
R43["offset 43 (aborted)"]
end
subgraph Consumers
RC["read_committed consumer\nsees: 40, 42 (skips 41 until commit, skips 43)"]
RU["read_uncommitted consumer\nsees: 40, 41, 42, 43"]
end
Broker --> RC
Broker --> RU
Consume-Transform-Produce Pattern
The most common transactional pattern: consume from orders, process, produce to orders-confirmed, commit offset — all atomically.
@Service
@RequiredArgsConstructor
public class OrderConfirmationService {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional("kafkaTransactionManager")
public void confirmOrder(ConsumerRecord<String, OrderPlacedEvent> record) {
OrderPlacedEvent placed = record.value();
// Business logic
inventoryService.reserveStock(placed);
// Produce to output topic — inside the transaction
OrderConfirmedEvent confirmed = new OrderConfirmedEvent(
placed.getOrderId(),
placed.getCustomerId(),
Instant.now().toEpochMilli()
);
kafkaTemplate.send("orders-confirmed", placed.getOrderId(), confirmed);
// Offset is committed as part of the transaction — not separately
}
}
Wire this into the listener container with EOSMode enabled:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
transactionalListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory,
KafkaTransactionManager<String, Object> txManager) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setTransactionManager(txManager);
return factory;
}
When a KafkaTransactionManager is set on the container, Spring Kafka:
- Begins a transaction before calling the listener
- Sends the consumer offset to the transaction via
sendOffsetsToTransaction - Commits the transaction after the listener returns successfully
- Aborts on exception — offset is not committed, record will be redelivered
Exactly-Once with @Transactional
@KafkaListener(
topics = "orders",
groupId = "order-confirmation-service",
containerFactory = "transactionalListenerContainerFactory"
)
@Transactional("kafkaTransactionManager")
public void onOrder(ConsumerRecord<String, OrderPlacedEvent> record) {
OrderPlacedEvent event = record.value();
// Both sends and offset commit happen atomically
kafkaTemplate.send("orders-confirmed", event.getOrderId(),
new OrderConfirmedEvent(event.getOrderId()));
kafkaTemplate.send("inventory-events", event.getOrderId(),
new StockReservedEvent(event.getOrderId(), event.getItems()));
}
If either send fails or the method throws, the transaction aborts — no records land in orders-confirmed or inventory-events, and the offset on orders is not committed.
Transactional ID Fencing
If an application instance crashes mid-transaction and restarts, Kafka uses the transactional.id to fence the zombie:
sequenceDiagram
participant OldProducer as "Old Producer (epoch=5)"
participant NewProducer as "New Producer (epoch=6)"
participant Broker
OldProducer->>Broker: beginTransaction()
OldProducer->>Broker: send(record1)
Note over OldProducer: CRASH 💥
NewProducer->>Broker: initTransactions() [same transactional.id]
Broker->>Broker: bump epoch to 6, fence epoch 5
NewProducer->>Broker: beginTransaction() [epoch=6]
OldProducer->>Broker: commitTransaction() [epoch=5 — REJECTED]
Note over Broker: Old transaction aborted\nNew transaction can proceed
EOS Modes
Spring Kafka supports two EOS modes:
| Mode | Setting | Behaviour |
|---|---|---|
V1 (Alpha) | EOSMode.V1 | Uses sendOffsetsToTransaction on every record |
V2 (Beta, default) | EOSMode.V2 | More efficient; uses one transaction per poll batch |
V2 is the default since Spring Kafka 2.6 and works with Kafka 2.5+. Use V1 only if running an older broker.
When NOT to Use Transactions
- Simple consumers with idempotent processing — at-least-once + idempotent logic is simpler and faster
- High-throughput fire-and-forget topics — transactions add latency (~30–50ms per commit round trip)
- Multiple consumer group members writing to different partitions — transactions work per producer instance; ensure each instance has a unique
transactional.id
Key Takeaways
- Kafka transactions make produce + offset commit atomic — either both happen or neither does
- Set
transactional.id,enable.idempotence=true, andacks=allon the producer — they are all required together - Set
isolation.level=read_committedon consumers to skip uncommitted and aborted records - Wire
KafkaTransactionManagerinto the listener container factory to enable EOS consume-transform-produce - Kafka fences zombie producers with the same
transactional.idusing epoch numbers - Use transactions only where exactly-once is genuinely needed — at-least-once + idempotency is sufficient for most applications
Next: @SendTo and @KafkaHandler: Chaining Consumers and Multi-Type Dispatch — forward listener output to another topic with @SendTo and dispatch multiple event types to separate handler methods with @KafkaHandler.