Filtering Messages with RecordFilterStrategy
Why Filter at the Container Level?
Multiple consumers can share a topic. The inventory service only cares about PLACED orders; the analytics service wants everything. Rather than putting if (event.getStatus() != PLACED) return; at the top of every listener, Spring Kafka lets you filter records before they reach your method — keeping business logic clean and making the filter reusable across listeners.
How RecordFilterStrategy Works
flowchart LR
Broker -->|"poll() → [r1, r2, r3, r4]"| Container
Container -->|"filter(r1) → false\nfilter(r2) → true\nfilter(r3) → false\nfilter(r4) → true"| Filter["RecordFilterStrategy"]
Filter -->|"[r2, r4] passed"| Listener["@KafkaListener"]
Filter -->|"[r1, r3] discarded"| Void["(dropped — offset still advances)"]
Filtered records are silently discarded — their offsets still advance. They are not sent to a DLT or retried. Use this only for records you genuinely do not care about.
Implementing RecordFilterStrategy
RecordFilterStrategy is a @FunctionalInterface — return true to filter (discard), false to keep:
@Bean
public RecordFilterStrategy<String, OrderPlacedEvent> placedOrderFilter() {
return record -> {
OrderPlacedEvent event = record.value();
// return true = discard this record
return event == null || event.getStatus() != OrderStatus.PLACED;
};
}
Wire it into the factory:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
inventoryListenerFactory(
ConsumerFactory<String, OrderPlacedEvent> consumerFactory,
RecordFilterStrategy<String, OrderPlacedEvent> placedOrderFilter) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
factory.setConsumerFactory(consumerFactory);
factory.setRecordFilterStrategy(placedOrderFilter);
return factory;
}
The @KafkaListener only receives records where filter() returned false:
@KafkaListener(
topics = "orders",
containerFactory = "inventoryListenerFactory"
)
public void onPlacedOrder(OrderPlacedEvent event) {
// guaranteed: event.getStatus() == PLACED
inventoryService.reserveStock(event);
}
Filter by Header
@Bean
public RecordFilterStrategy<String, OrderPlacedEvent> regionFilter() {
return record -> {
Header regionHeader = record.headers().lastHeader("X-Region");
if (regionHeader == null) return true; // discard if no region
return !"EU".equals(new String(regionHeader.value()));
};
}
Only records with X-Region: EU reach the listener.
Filter by Key
@Bean
public RecordFilterStrategy<String, OrderPlacedEvent> highValueFilter() {
return record -> {
// keys are formatted as "customerId:orderId"
// discard records where key does not start with "VIP-"
String key = record.key();
return key == null || !key.startsWith("VIP-");
};
}
Filter by Partition
Useful when running integration tests against a shared topic and you only want specific partitions:
@Bean
public RecordFilterStrategy<String, OrderPlacedEvent> partition0Filter() {
return record -> record.partition() != 0;
}
Ack Discarded Records (AckDiscarded)
By default, filtered records advance the offset — but in MANUAL AckMode you may need to explicitly acknowledge them. Set ackDiscarded to handle this automatically:
factory.setAckDiscarded(true);
When true, the container calls acknowledge() on discarded records so their offsets are committed even in manual mode. When false (default), discarded records are silently dropped without acknowledgment — the offset still advances because the container manages it internally in automatic modes.
Batch Filtering
With setBatchListener(true), the container applies the filter to each record in the batch individually, delivering only the passing subset:
sequenceDiagram
participant Broker
participant Container
participant Filter
participant Listener
Broker-->>Container: poll() → [r40, r41, r42, r43, r44]
Container->>Filter: filter(r40) → false (keep)
Container->>Filter: filter(r41) → true (discard)
Container->>Filter: filter(r42) → false (keep)
Container->>Filter: filter(r43) → true (discard)
Container->>Filter: filter(r44) → false (keep)
Container->>Listener: onBatch([r40, r42, r44])
The same RecordFilterStrategy bean works for both single and batch listeners.
Combining Filter with Error Handler
Filtered records never reach the listener, so error handlers never see them. The two concerns are independent:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
inventoryListenerFactory(
ConsumerFactory<String, OrderPlacedEvent> consumerFactory,
RecordFilterStrategy<String, OrderPlacedEvent> placedOrderFilter,
DefaultErrorHandler errorHandler) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>();
factory.setConsumerFactory(consumerFactory);
factory.setRecordFilterStrategy(placedOrderFilter);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
Errors thrown inside the listener (for records that passed the filter) are handled by the error handler as normal.
RecordFilterStrategy vs @KafkaHandler
| Approach | Where filter runs | Failed records | Use when |
|---|---|---|---|
RecordFilterStrategy | Container, before listener | Silently discarded | Cheap, stateless checks on key/header/value field |
if/return in listener | Inside listener method | Offset advances, no error | You need access to database or external state |
@KafkaHandler dispatch | Inside class method dispatch | Routes by type | Multiple payload types on one topic |
Key Takeaways
RecordFilterStrategyis a@FunctionalInterface— returntrueto discard a record,falseto keep it- Filtering happens after deserialization but before your listener method is called
- Filtered records are silently dropped; their offsets still advance — do not use filtering as a retry mechanism
- Set
ackDiscarded(true)on the factory when usingMANUALAckMode to avoid stalled commits - The same strategy works for both single-record and batch listeners
- Keep filters cheap and stateless — they run on the polling thread for every record
Next: Pausing, Resuming, and Stopping Listener Containers — control listener container state at runtime for circuit breakers, back-pressure, and graceful shutdown.