Filtering Messages with RecordFilterStrategy

Why Filter at the Container Level?

Multiple consumers can share a topic. The inventory service only cares about PLACED orders; the analytics service wants everything. Rather than putting if (event.getStatus() != PLACED) return; at the top of every listener, Spring Kafka lets you filter records before they reach your method — keeping business logic clean and making the filter reusable across listeners.


How RecordFilterStrategy Works

flowchart LR
    Broker -->|"poll() → [r1, r2, r3, r4]"| Container
    Container -->|"filter(r1) → false\nfilter(r2) → true\nfilter(r3) → false\nfilter(r4) → true"| Filter["RecordFilterStrategy"]
    Filter -->|"[r2, r4] passed"| Listener["@KafkaListener"]
    Filter -->|"[r1, r3] discarded"| Void["(dropped — offset still advances)"]

Filtered records are silently discarded — their offsets still advance. They are not sent to a DLT or retried. Use this only for records you genuinely do not care about.


Implementing RecordFilterStrategy

RecordFilterStrategy is a @FunctionalInterface — return true to filter (discard), false to keep:

@Bean
public RecordFilterStrategy<String, OrderPlacedEvent> placedOrderFilter() {
    return record -> {
        OrderPlacedEvent event = record.value();
        // return true = discard this record
        return event == null || event.getStatus() != OrderStatus.PLACED;
    };
}

Wire it into the factory:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
        inventoryListenerFactory(
                ConsumerFactory<String, OrderPlacedEvent> consumerFactory,
                RecordFilterStrategy<String, OrderPlacedEvent> placedOrderFilter) {

    var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
    factory.setConsumerFactory(consumerFactory);
    factory.setRecordFilterStrategy(placedOrderFilter);
    return factory;
}

The @KafkaListener only receives records where filter() returned false:

@KafkaListener(
    topics = "orders",
    containerFactory = "inventoryListenerFactory"
)
public void onPlacedOrder(OrderPlacedEvent event) {
    // guaranteed: event.getStatus() == PLACED
    inventoryService.reserveStock(event);
}

Filter by Header

@Bean
public RecordFilterStrategy<String, OrderPlacedEvent> regionFilter() {
    return record -> {
        Header regionHeader = record.headers().lastHeader("X-Region");
        if (regionHeader == null) return true;  // discard if no region
        return !"EU".equals(new String(regionHeader.value()));
    };
}

Only records with X-Region: EU reach the listener.


Filter by Key

@Bean
public RecordFilterStrategy<String, OrderPlacedEvent> highValueFilter() {
    return record -> {
        // keys are formatted as "customerId:orderId"
        // discard records where key does not start with "VIP-"
        String key = record.key();
        return key == null || !key.startsWith("VIP-");
    };
}

Filter by Partition

Useful when running integration tests against a shared topic and you only want specific partitions:

@Bean
public RecordFilterStrategy<String, OrderPlacedEvent> partition0Filter() {
    return record -> record.partition() != 0;
}

Ack Discarded Records (AckDiscarded)

By default, filtered records advance the offset — but in MANUAL AckMode you may need to explicitly acknowledge them. Set ackDiscarded to handle this automatically:

factory.setAckDiscarded(true);

When true, the container calls acknowledge() on discarded records so their offsets are committed even in manual mode. When false (default), discarded records are silently dropped without acknowledgment — the offset still advances because the container manages it internally in automatic modes.


Batch Filtering

With setBatchListener(true), the container applies the filter to each record in the batch individually, delivering only the passing subset:

sequenceDiagram
    participant Broker
    participant Container
    participant Filter
    participant Listener

    Broker-->>Container: poll() → [r40, r41, r42, r43, r44]
    Container->>Filter: filter(r40) → false (keep)
    Container->>Filter: filter(r41) → true (discard)
    Container->>Filter: filter(r42) → false (keep)
    Container->>Filter: filter(r43) → true (discard)
    Container->>Filter: filter(r44) → false (keep)
    Container->>Listener: onBatch([r40, r42, r44])

The same RecordFilterStrategy bean works for both single and batch listeners.


Combining Filter with Error Handler

Filtered records never reach the listener, so error handlers never see them. The two concerns are independent:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
        inventoryListenerFactory(
                ConsumerFactory<String, OrderPlacedEvent> consumerFactory,
                RecordFilterStrategy<String, OrderPlacedEvent> placedOrderFilter,
                DefaultErrorHandler errorHandler) {

    var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
    factory.setConsumerFactory(consumerFactory);
    factory.setRecordFilterStrategy(placedOrderFilter);
    factory.setCommonErrorHandler(errorHandler);
    return factory;
}

Errors thrown inside the listener (for records that passed the filter) are handled by the error handler as normal.


RecordFilterStrategy vs @KafkaHandler

ApproachWhere filter runsFailed recordsUse when
RecordFilterStrategyContainer, before listenerSilently discardedCheap, stateless checks on key/header/value field
if/return in listenerInside listener methodOffset advances, no errorYou need access to database or external state
@KafkaHandler dispatchInside class method dispatchRoutes by typeMultiple payload types on one topic

Key Takeaways

  • RecordFilterStrategy is a @FunctionalInterface — return true to discard a record, false to keep it
  • Filtering happens after deserialization but before your listener method is called
  • Filtered records are silently dropped; their offsets still advance — do not use filtering as a retry mechanism
  • Set ackDiscarded(true) on the factory when using MANUAL AckMode to avoid stalled commits
  • The same strategy works for both single-record and batch listeners
  • Keep filters cheap and stateless — they run on the polling thread for every record

Next: Pausing, Resuming, and Stopping Listener Containers — control listener container state at runtime for circuit breakers, back-pressure, and graceful shutdown.