Seeking to Specific Offsets: Replay, Recovery, and Time-Based Seeking

Why Seek Instead of Reset?

Offset management (auto-commit vs manual acknowledgment) controls when offsets advance during normal processing. Seeking is different: it lets you reposition the consumer to any offset — past or future — programmatically, without touching the committed offset in __consumer_offsets.

Common scenarios:

  • Replay from the beginning — reprocess all historical events after a bug fix
  • Resume from a known-good offset — skip a poison pill that’s blocking the consumer
  • Time-based replay — reprocess everything since yesterday 09:00
  • Startup positioning — always start from the end, ignoring backlog on first launch

How Kafka Seeking Works

flowchart LR
    subgraph Broker["Broker: orders-0"]
        direction LR
        O0["offset 0"] --> O1["offset 1"] --> O2["..."] --> O100["offset 100"]
        O0 -.->|"earliest"| S1
        O100 -.->|"latest"| S2
        O42 -.->|"seekToOffset(42)"| S3
    end

    S1["Consumer reads from 0"]
    S2["Consumer reads from end"]
    S3["Consumer reads from 42"]

Seeking changes the position the consumer will use on the next poll(). The committed offset in the broker is unaffected — seeking is local to the consumer instance.


ConsumerSeekAware

Spring Kafka provides ConsumerSeekAware — implement it in your @KafkaListener class to receive a ConsumerSeekCallback when partitions are assigned:

@Component
public class OrderEventListener implements ConsumerSeekAware {

    private ConsumerSeekCallback seekCallback;

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.seekCallback = callback;  // store for later use
    }

    @Override
    public void onPartitionsAssigned(
            Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {
        // called every time this consumer is assigned partitions
    }

    @Override
    public void onIdleContainer(
            Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {
        // called when the listener container becomes idle
    }

    @KafkaListener(topics = "orders", groupId = "order-service")
    public void onOrder(OrderPlacedEvent event) {
        // process normally
    }
}

ConsumerSeekCallback exposes the seek methods. You can call them immediately in onPartitionsAssigned, or store the callback and call it later (e.g., from a REST endpoint or scheduled task).


Seek to Beginning — Full Replay

Reprocess all records from offset 0 on every partition assigned to this consumer:

@Override
public void onPartitionsAssigned(
        Map<TopicPartition, Long> assignments,
        ConsumerSeekCallback callback) {

    assignments.forEach((partition, currentOffset) ->
        callback.seekToBeginning(partition.topic(), partition.partition())
    );
}
sequenceDiagram
    participant Listener
    participant Container
    participant Broker

    Container->>Listener: onPartitionsAssigned([orders-0, orders-1])
    Listener->>Container: seekToBeginning(orders-0)
    Listener->>Container: seekToBeginning(orders-1)
    Container->>Broker: poll() starting from offset 0
    Broker-->>Container: records [0 … 99]

This is useful after deploying a bug fix that requires full event replay. The committed offset is not changed — if the consumer restarts normally (without seeking), it will resume from the committed position.


Seek to End — Skip Backlog on Startup

Start consuming only new messages, ignoring any backlog:

@Override
public void onPartitionsAssigned(
        Map<TopicPartition, Long> assignments,
        ConsumerSeekCallback callback) {

    assignments.forEach((partition, currentOffset) ->
        callback.seekToEnd(partition.topic(), partition.partition())
    );
}

Useful for monitoring consumers that only care about real-time events and don’t need to process historical data when they first join.


Seek to a Specific Offset

Skip a known-bad record or resume from a checkpoint:

@Override
public void onPartitionsAssigned(
        Map<TopicPartition, Long> assignments,
        ConsumerSeekCallback callback) {

    // Always start from offset 42 on partition 0
    callback.seek("orders", 0, 42L);
}

Skipping a Poison Pill at Runtime

If a specific offset is causing repeated failures, seek past it without restarting the consumer:

@KafkaListener(topics = "orders", groupId = "order-service")
public void onOrder(ConsumerRecord<String, OrderPlacedEvent> record, Acknowledgment ack) {
    try {
        orderService.process(record.value());
        ack.acknowledge();
    } catch (UnrecoverableException e) {
        log.error("Poison pill at offset={}, skipping", record.offset());
        // seek past the bad record — next poll will start from offset+1
        seekCallback.seek(record.topic(), record.partition(), record.offset() + 1);
        ack.acknowledge();  // commit offset so we don't retry on restart
    }
}
sequenceDiagram
    participant Listener
    participant Container
    participant Broker

    Broker-->>Container: record at offset 55 (poison pill)
    Container->>Listener: onOrder(record=55)
    Listener->>Listener: UnrecoverableException 💥
    Listener->>Container: seek(orders-0, offset=56)
    Listener->>Container: ack.acknowledge()
    Container->>Broker: commitOffset(56)
    Container->>Broker: poll() → records [56, 57, 58...]
    Note over Listener: Offset 55 is skipped permanently

Time-Based Seeking — Replay Since a Timestamp

Seek all assigned partitions to the first offset whose timestamp is at or after a given instant:

@Override
public void onPartitionsAssigned(
        Map<TopicPartition, Long> assignments,
        ConsumerSeekCallback callback) {

    Instant replayFrom = Instant.now().minus(Duration.ofHours(24));
    long epochMs = replayFrom.toEpochMilli();

    assignments.forEach((partition, currentOffset) ->
        callback.seekToTimestamp(partition.topic(), partition.partition(), epochMs)
    );
}

Kafka maps the timestamp to the earliest offset whose record timestamp is ≥ the requested epoch. If no record has a timestamp that late, the consumer is positioned at the end.

sequenceDiagram
    participant Consumer
    participant Broker

    Consumer->>Broker: offsetsForTimes({orders-0: epochMs=T})
    Broker-->>Consumer: {orders-0: offset=87}
    Consumer->>Consumer: seek(orders-0, 87)
    Consumer->>Broker: poll() → records from offset 87 onward

Practical Example: Replay Yesterday’s Orders

@Component
public class OrderReplayListener implements ConsumerSeekAware {

    private static final Logger log = LoggerFactory.getLogger(OrderReplayListener.class);

    @Override
    public void onPartitionsAssigned(
            Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {

        long yesterdayMidnight = LocalDate.now()
            .minusDays(1)
            .atStartOfDay(ZoneOffset.UTC)
            .toInstant()
            .toEpochMilli();

        assignments.keySet().forEach(tp -> {
            log.info("Seeking {}-{} to yesterday midnight", tp.topic(), tp.partition());
            callback.seekToTimestamp(tp.topic(), tp.partition(), yesterdayMidnight);
        });
    }

    @KafkaListener(topics = "orders", groupId = "order-replay-service")
    public void onOrder(OrderPlacedEvent event) {
        // reprocess
    }
}

Triggering a Seek via REST Endpoint

Store the callback at assignment time and expose a REST endpoint to trigger replay on demand:

@RestController
@RequiredArgsConstructor
public class ReplayController {

    private final OrderEventListener listener;

    @PostMapping("/admin/replay")
    public ResponseEntity<String> replayAll() {
        listener.replayFromBeginning();
        return ResponseEntity.ok("Replay triggered");
    }
}

@Component
public class OrderEventListener implements ConsumerSeekAware {

    private ConsumerSeekCallback seekCallback;
    private Set<TopicPartition> assignedPartitions = ConcurrentHashMap.newKeySet();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.seekCallback = callback;
    }

    @Override
    public void onPartitionsAssigned(
            Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {
        assignedPartitions.addAll(assignments.keySet());
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        assignedPartitions.removeAll(partitions);
    }

    public void replayFromBeginning() {
        assignedPartitions.forEach(tp ->
            seekCallback.seekToBeginning(tp.topic(), tp.partition())
        );
    }

    @KafkaListener(topics = "orders", groupId = "order-service")
    public void onOrder(OrderPlacedEvent event) {
        // process
    }
}

Seek Behaviour Summary

flowchart TD
    Q1{When do you need to seek?}
    Q1 -->|On every startup| A1["onPartitionsAssigned\n— always repositions on restart"]
    Q1 -->|On demand at runtime| A2["Store seekCallback\n— call from REST/scheduler"]
    Q1 -->|After a processing failure| A3["Seek inside @KafkaListener\n— skip or retry specific offset"]

    A1 --> B1{"What position?"}
    A2 --> B1
    A3 --> B2["seek(topic, partition, offset+1)\nto skip the bad record"]

    B1 -->|Full replay| C1["seekToBeginning()"]
    B1 -->|Skip backlog| C2["seekToEnd()"]
    B1 -->|Known checkpoint| C3["seek(offset)"]
    B1 -->|Time-based| C4["seekToTimestamp(epochMs)"]

ConsumerSeekCallback Reference

MethodDescription
seekToBeginning(topic, partition)Seek to offset 0
seekToEnd(topic, partition)Seek to the latest offset
seek(topic, partition, offset)Seek to a specific offset
seekToTimestamp(topic, partition, epochMs)Seek to first offset at or after timestamp
seekRelative(topic, partition, offset, toCurrent)Seek relative to current or beginning

All methods take effect on the next poll() — there is no synchronous confirmation.


Key Takeaways

  • Seeking repositions the consumer locally — it does not change the committed offset in __consumer_offsets
  • Implement ConsumerSeekAware to receive a ConsumerSeekCallback when partitions are assigned
  • onPartitionsAssigned is the right place for startup-time seeking (replay, skip backlog)
  • Store seekCallback from registerSeekCallback to trigger seeks at runtime (REST endpoints, scheduled tasks)
  • Time-based seeking (seekToTimestamp) is ideal for replaying a rolling window of events after incidents
  • Seeking past a poison pill at runtime (seek(offset + 1) + ack.acknowledge()) avoids consumer group restarts

Next: Consumer @Bean Configuration: ConcurrentKafkaListenerContainerFactory — configure concurrency, error handlers, and container properties using Java @Bean definitions instead of application.properties.