Seeking to Specific Offsets: Replay, Recovery, and Time-Based Seeking
Why Seek Instead of Reset?
Offset management (auto-commit vs manual acknowledgment) controls when offsets advance during normal processing. Seeking is different: it lets you reposition the consumer to any offset — past or future — programmatically, without touching the committed offset in __consumer_offsets.
Common scenarios:
- Replay from the beginning — reprocess all historical events after a bug fix
- Resume from a known-good offset — skip a poison pill that’s blocking the consumer
- Time-based replay — reprocess everything since yesterday 09:00
- Startup positioning — always start from the end, ignoring backlog on first launch
How Kafka Seeking Works
flowchart LR
subgraph Broker["Broker: orders-0"]
direction LR
O0["offset 0"] --> O1["offset 1"] --> O2["..."] --> O100["offset 100"]
O0 -.->|"earliest"| S1
O100 -.->|"latest"| S2
O42 -.->|"seekToOffset(42)"| S3
end
S1["Consumer reads from 0"]
S2["Consumer reads from end"]
S3["Consumer reads from 42"]
Seeking changes the position the consumer will use on the next poll(). The committed offset in the broker is unaffected — seeking is local to the consumer instance.
ConsumerSeekAware
Spring Kafka provides ConsumerSeekAware — implement it in your @KafkaListener class to receive a ConsumerSeekCallback when partitions are assigned:
@Component
public class OrderEventListener implements ConsumerSeekAware {
private ConsumerSeekCallback seekCallback;
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.seekCallback = callback; // store for later use
}
@Override
public void onPartitionsAssigned(
Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
// called every time this consumer is assigned partitions
}
@Override
public void onIdleContainer(
Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
// called when the listener container becomes idle
}
@KafkaListener(topics = "orders", groupId = "order-service")
public void onOrder(OrderPlacedEvent event) {
// process normally
}
}
ConsumerSeekCallback exposes the seek methods. You can call them immediately in onPartitionsAssigned, or store the callback and call it later (e.g., from a REST endpoint or scheduled task).
Seek to Beginning — Full Replay
Reprocess all records from offset 0 on every partition assigned to this consumer:
@Override
public void onPartitionsAssigned(
Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.forEach((partition, currentOffset) ->
callback.seekToBeginning(partition.topic(), partition.partition())
);
}
sequenceDiagram
participant Listener
participant Container
participant Broker
Container->>Listener: onPartitionsAssigned([orders-0, orders-1])
Listener->>Container: seekToBeginning(orders-0)
Listener->>Container: seekToBeginning(orders-1)
Container->>Broker: poll() starting from offset 0
Broker-->>Container: records [0 … 99]
This is useful after deploying a bug fix that requires full event replay. The committed offset is not changed — if the consumer restarts normally (without seeking), it will resume from the committed position.
Seek to End — Skip Backlog on Startup
Start consuming only new messages, ignoring any backlog:
@Override
public void onPartitionsAssigned(
Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.forEach((partition, currentOffset) ->
callback.seekToEnd(partition.topic(), partition.partition())
);
}
Useful for monitoring consumers that only care about real-time events and don’t need to process historical data when they first join.
Seek to a Specific Offset
Skip a known-bad record or resume from a checkpoint:
@Override
public void onPartitionsAssigned(
Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
// Always start from offset 42 on partition 0
callback.seek("orders", 0, 42L);
}
Skipping a Poison Pill at Runtime
If a specific offset is causing repeated failures, seek past it without restarting the consumer:
@KafkaListener(topics = "orders", groupId = "order-service")
public void onOrder(ConsumerRecord<String, OrderPlacedEvent> record, Acknowledgment ack) {
try {
orderService.process(record.value());
ack.acknowledge();
} catch (UnrecoverableException e) {
log.error("Poison pill at offset={}, skipping", record.offset());
// seek past the bad record — next poll will start from offset+1
seekCallback.seek(record.topic(), record.partition(), record.offset() + 1);
ack.acknowledge(); // commit offset so we don't retry on restart
}
}
sequenceDiagram
participant Listener
participant Container
participant Broker
Broker-->>Container: record at offset 55 (poison pill)
Container->>Listener: onOrder(record=55)
Listener->>Listener: UnrecoverableException 💥
Listener->>Container: seek(orders-0, offset=56)
Listener->>Container: ack.acknowledge()
Container->>Broker: commitOffset(56)
Container->>Broker: poll() → records [56, 57, 58...]
Note over Listener: Offset 55 is skipped permanently
Time-Based Seeking — Replay Since a Timestamp
Seek all assigned partitions to the first offset whose timestamp is at or after a given instant:
@Override
public void onPartitionsAssigned(
Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
Instant replayFrom = Instant.now().minus(Duration.ofHours(24));
long epochMs = replayFrom.toEpochMilli();
assignments.forEach((partition, currentOffset) ->
callback.seekToTimestamp(partition.topic(), partition.partition(), epochMs)
);
}
Kafka maps the timestamp to the earliest offset whose record timestamp is ≥ the requested epoch. If no record has a timestamp that late, the consumer is positioned at the end.
sequenceDiagram
participant Consumer
participant Broker
Consumer->>Broker: offsetsForTimes({orders-0: epochMs=T})
Broker-->>Consumer: {orders-0: offset=87}
Consumer->>Consumer: seek(orders-0, 87)
Consumer->>Broker: poll() → records from offset 87 onward
Practical Example: Replay Yesterday’s Orders
@Component
public class OrderReplayListener implements ConsumerSeekAware {
private static final Logger log = LoggerFactory.getLogger(OrderReplayListener.class);
@Override
public void onPartitionsAssigned(
Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
long yesterdayMidnight = LocalDate.now()
.minusDays(1)
.atStartOfDay(ZoneOffset.UTC)
.toInstant()
.toEpochMilli();
assignments.keySet().forEach(tp -> {
log.info("Seeking {}-{} to yesterday midnight", tp.topic(), tp.partition());
callback.seekToTimestamp(tp.topic(), tp.partition(), yesterdayMidnight);
});
}
@KafkaListener(topics = "orders", groupId = "order-replay-service")
public void onOrder(OrderPlacedEvent event) {
// reprocess
}
}
Triggering a Seek via REST Endpoint
Store the callback at assignment time and expose a REST endpoint to trigger replay on demand:
@RestController
@RequiredArgsConstructor
public class ReplayController {
private final OrderEventListener listener;
@PostMapping("/admin/replay")
public ResponseEntity<String> replayAll() {
listener.replayFromBeginning();
return ResponseEntity.ok("Replay triggered");
}
}
@Component
public class OrderEventListener implements ConsumerSeekAware {
private ConsumerSeekCallback seekCallback;
private Set<TopicPartition> assignedPartitions = ConcurrentHashMap.newKeySet();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.seekCallback = callback;
}
@Override
public void onPartitionsAssigned(
Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignedPartitions.addAll(assignments.keySet());
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
assignedPartitions.removeAll(partitions);
}
public void replayFromBeginning() {
assignedPartitions.forEach(tp ->
seekCallback.seekToBeginning(tp.topic(), tp.partition())
);
}
@KafkaListener(topics = "orders", groupId = "order-service")
public void onOrder(OrderPlacedEvent event) {
// process
}
}
Seek Behaviour Summary
flowchart TD
Q1{When do you need to seek?}
Q1 -->|On every startup| A1["onPartitionsAssigned\n— always repositions on restart"]
Q1 -->|On demand at runtime| A2["Store seekCallback\n— call from REST/scheduler"]
Q1 -->|After a processing failure| A3["Seek inside @KafkaListener\n— skip or retry specific offset"]
A1 --> B1{"What position?"}
A2 --> B1
A3 --> B2["seek(topic, partition, offset+1)\nto skip the bad record"]
B1 -->|Full replay| C1["seekToBeginning()"]
B1 -->|Skip backlog| C2["seekToEnd()"]
B1 -->|Known checkpoint| C3["seek(offset)"]
B1 -->|Time-based| C4["seekToTimestamp(epochMs)"]
ConsumerSeekCallback Reference
| Method | Description |
|---|---|
seekToBeginning(topic, partition) | Seek to offset 0 |
seekToEnd(topic, partition) | Seek to the latest offset |
seek(topic, partition, offset) | Seek to a specific offset |
seekToTimestamp(topic, partition, epochMs) | Seek to first offset at or after timestamp |
seekRelative(topic, partition, offset, toCurrent) | Seek relative to current or beginning |
All methods take effect on the next poll() — there is no synchronous confirmation.
Key Takeaways
- Seeking repositions the consumer locally — it does not change the committed offset in
__consumer_offsets - Implement
ConsumerSeekAwareto receive aConsumerSeekCallbackwhen partitions are assigned onPartitionsAssignedis the right place for startup-time seeking (replay, skip backlog)- Store
seekCallbackfromregisterSeekCallbackto trigger seeks at runtime (REST endpoints, scheduled tasks) - Time-based seeking (
seekToTimestamp) is ideal for replaying a rolling window of events after incidents - Seeking past a poison pill at runtime (
seek(offset + 1)+ack.acknowledge()) avoids consumer group restarts
Next: Consumer @Bean Configuration: ConcurrentKafkaListenerContainerFactory — configure concurrency, error handlers, and container properties using Java @Bean definitions instead of application.properties.