Idempotent Producers: Eliminating Duplicate Messages
The Duplicate Problem
With acks=all and retries enabled, a produce request might be acknowledged by the broker, but the acknowledgment is lost in the network before reaching the producer. The producer, seeing no response, retries — sending the same record again. The broker writes it a second time. The consumer sees a duplicate.
sequenceDiagram
participant Producer
participant Leader as Broker (Leader)
Producer->>Leader: ProduceRequest: OrderPlaced (orderId=1001)
Leader->>Leader: Write record at offset 42 ✓
Leader--xProducer: ProduceResponse LOST (network failure)
Note over Producer: No ack received — retrying
Producer->>Leader: ProduceRequest: OrderPlaced (orderId=1001) [RETRY]
Leader->>Leader: Write record at offset 43 ✓ (DUPLICATE!)
Leader-->>Producer: ProduceResponse ✓
Note over Leader: Partition now has TWO copies\nof the same OrderPlaced event!
For order events, this is a serious problem: the inventory service deducts stock twice, the payment service charges twice, the customer gets two confirmation emails.
How Idempotence Works
Kafka’s idempotent producer assigns every producer a unique Producer ID (PID) and increments a sequence number for each record sent to each partition. The broker tracks the last sequence number per (PID, partition) pair and silently drops any record it has already written.
flowchart TB
subgraph Producer
PID["Producer ID: 42\n(assigned by broker at startup)"]
SeqCounter["Sequence counter per partition:\norders-P0: 100\norders-P1: 75\npayments-P0: 30"]
end
subgraph Broker
State["Broker state per (PID, partition):\nPID=42, orders-P0: last_seq=99\nPID=42, orders-P1: last_seq=74"]
end
Producer -->|"Record: PID=42, seq=100, OrderPlaced"| Broker
Broker -->|"seq=100, expected=100 ✓ — write"| Accepted["Write at offset 150"]
On retry of the same record:
sequenceDiagram
participant Producer
participant Broker
Producer->>Broker: PID=42, seq=100, OrderPlaced (orderId=1001)
Broker->>Broker: Write at offset 150. Last seq for PID=42,P0 = 100 ✓
Note over Broker: Ack is LOST in network
Producer->>Broker: PID=42, seq=100, OrderPlaced (orderId=1001) [RETRY]
Broker->>Broker: seq=100 already seen for PID=42,P0 — DUPLICATE!
Broker-->>Producer: DuplicateSequenceNumber (silently ignored, returns original ack)
Note over Producer: Producer receives ack as if first attempt succeeded
Note over Broker: No duplicate written — exactly once at broker level
Enabling Idempotence
One property enables the entire mechanism:
spring.kafka.producer.properties.enable.idempotence=true
Or in @Bean configuration:
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
Enabling idempotence automatically enforces:
acks=all(required — idempotence needs all ISR to track the sequence)max.in.flight.requests.per.connection=5(maximum; ensures in-order delivery)retries=Integer.MAX_VALUE(producer will retry until delivery timeout)
If you explicitly set acks=0 or acks=1 with enable.idempotence=true, Kafka throws a ConfigException at startup.
Producer ID and Epoch
sequenceDiagram
participant Producer
participant Broker
Producer->>Broker: InitProducerId request
Broker-->>Producer: ProducerID=42, Epoch=0
Note over Producer: Producer now has identity:\nPID=42, Epoch=0
Note over Producer: Producer crashes and restarts
Producer->>Broker: InitProducerId request (wants PID=42 back)
Broker-->>Producer: ProducerID=42, Epoch=1 (epoch bumped!)
Note over Broker: Old producer (Epoch=0) records\nare now rejected if they arrive\n→ zombie producer protection
The epoch is incremented on every producer restart. Any in-flight requests from the old producer instance (lower epoch) are rejected by the broker. This prevents a “zombie” old producer from interfering with a restarted producer.
Sequence Numbers and In-Order Delivery
The broker tracks the last accepted sequence number per (PID, partition). Records must arrive in sequence — no gaps allowed.
flowchart TD
R100["Record: seq=100 (next expected)"]
R101["Record: seq=101"]
R102["Record: seq=102"]
OldRecord["Record: seq=99 (old, retry)"]
FutureRecord["Record: seq=102 (gap! seq=101 missing)"]
R100 -->|"✓ In order — accept"| Write
R101 -->|"✓ In order — accept"| Write
OldRecord -->|"✗ Already seen — duplicate, drop"| Drop
FutureRecord -->|"✗ Out of order — OutOfOrderSequenceException"| Error
OutOfOrderSequenceException (a non-retriable error) indicates a serious producer bug or configuration error, not a transient network issue. In practice, the client library prevents this from happening under normal operation.
Idempotence Scope: Per Producer, Per Partition
Idempotence guarantees exactly-once delivery within a single producer session, for a single partition. It does not provide:
- Cross-partition atomic writes (use transactions for that — article 29)
- Idempotence across producer restarts involving different broker clusters
- Protection against application-level duplicates (e.g. calling
kafkaTemplate.send()twice in a bug)
flowchart LR
subgraph Covers["Idempotence covers"]
C1["✓ Network retries causing duplicate sends"]
C2["✓ Producer-internal retry of failed records"]
C3["✓ Zombie producer records from old epoch"]
end
subgraph DoesNotCover["Idempotence does NOT cover"]
D1["✗ Application calling send() twice on same event"]
D2["✗ Consumer processing the same record twice\n(that requires idempotent consumer logic)"]
D3["✗ Atomicity across multiple topics\n(that requires transactions)"]
end
Complete Idempotent Producer Configuration
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers) {
Map<String, Object> config = new HashMap<>();
// Connection
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.CLIENT_ID_CONFIG, "order-service-producer");
// Serialization
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Idempotence (sets acks=all, retries=MAX, max.in.flight=5 automatically)
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Explicit timeouts (override the idempotence defaults if needed)
config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30_000);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
// Performance
config.put(ProducerConfig.LINGER_MS_CONFIG, 5);
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32_768);
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
Testing Idempotence
Verify idempotence is working by inspecting producer metrics:
@Component
public class KafkaProducerMetrics {
private final MeterRegistry meterRegistry;
private final KafkaTemplate<String, Object> kafkaTemplate;
@EventListener(ApplicationReadyEvent.class)
public void registerMetrics() {
// Bind Kafka producer metrics to Micrometer
KafkaClientMetrics kafkaMetrics = new KafkaClientMetrics(
kafkaTemplate.getProducerFactory().createProducer()
);
kafkaMetrics.bindTo(meterRegistry);
}
}
The metric kafka.producer.record.retry.rate shows retry rate. The metric kafka.producer.record.error.rate shows send failures. With idempotence enabled, record.error.rate should be near zero under normal network conditions.
Idempotence vs Transactions
| Feature | Idempotence | Transactions |
|---|---|---|
| Scope | Single producer, single partition | Multiple topics, multiple partitions |
| Setup | enable.idempotence=true | transactional.id + KafkaTransactionManager |
| Consumer requirement | None | Consumer must set isolation.level=read_committed |
| Performance overhead | Low | Higher (2-phase commit) |
| Use case | Prevent duplicate sends | Atomic publish across topics |
Use idempotence for all producers. Add transactions only when you need atomic writes across multiple topics or a read-process-write pattern (consume from one topic, publish to another, atomically).
Key Takeaways
- Without idempotence, retries after a lost acknowledgment cause duplicate records — the broker writes the same record twice
- Idempotence uses a Producer ID (PID) + epoch + per-partition sequence numbers to detect and drop duplicates
- Enable with
enable.idempotence=true— this automatically setsacks=all,retries=MAX, andmax.in.flight=5 - The epoch is incremented on producer restart — old-epoch records from a crashed producer are rejected (zombie protection)
- Idempotence covers network-level retries; it does not cover application-level bugs that call
send()twice - Idempotence is per-producer, per-partition — for atomic cross-topic writes, use transactions (article 29)
- Always enable idempotence in production — the overhead is minimal and the protection is significant
Next: Kafka Consumer in Spring Boot: @KafkaListener Basics — build your first consumer, configure the listener, and process messages from the orders topic.