Kafka Streams with Spring Boot: Stateless and Stateful Processing
Kafka Streams vs @KafkaListener
@KafkaListener is a consumer — it reads records and processes them one by one or in batches. Kafka Streams is a stream processing library — it builds a topology of transformations that runs continuously, with built-in state stores, windowed aggregations, and join operations.
| Aspect | @KafkaListener | Kafka Streams |
|---|---|---|
| Processing model | Consume and process | Topology of operators |
| Stateful processing | Manual (external DB) | Built-in state stores (RocksDB) |
| Windowed aggregations | Manual | Native (time, session, hopping) |
| Joins | Manual | KStream-KTable, KStream-KStream |
| Fault tolerance | Committed offsets | Changelog topics + offsets |
| Use when | Imperative event handling | Stream transformations and aggregations |
Maven Dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Auto-Configuration
Add @EnableKafkaStreams and configure application.properties:
@SpringBootApplication
@EnableKafkaStreams
public class OrderAnalyticsApplication { }
spring.application.name=order-analytics
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=order-analytics
spring.kafka.streams.default-key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.default-value-serde=org.springframework.kafka.support.serializer.JsonSerde
spring.kafka.streams.replication-factor=1
application-id must be unique per application — it determines the consumer group and changelog topic prefix.
Topology Building with StreamsBuilder
Spring auto-creates a StreamsBuilder bean. Declare your topology in a @Bean method returning KStream or KTable:
@Configuration
public class OrderStreamTopology {
@Bean
public KStream<String, OrderPlacedEvent> orderStream(StreamsBuilder builder) {
return builder.stream("orders");
}
}
Stateless Processing — Filter and Map
flowchart LR
src["orders\nKStream<String, OrderPlacedEvent>"]
filter["filter()\nonly EU orders"]
map["mapValues()\nOrderPlacedEvent → AnalyticsEvent"]
sink["orders-analytics"]
src --> filter --> map --> sink
@Bean
public KStream<String, OrderPlacedEvent> orderStream(StreamsBuilder builder) {
KStream<String, OrderPlacedEvent> orders = builder.stream("orders",
Consumed.with(Serdes.String(), new JsonSerde<>(OrderPlacedEvent.class)));
orders
.filter((key, event) -> "EU".equals(event.getRegion()))
.mapValues(event -> new AnalyticsEvent(
event.getOrderId(), event.getTotalAmount(), event.getPlacedAt()
))
.to("orders-analytics",
Produced.with(Serdes.String(), new JsonSerde<>(AnalyticsEvent.class)));
return orders;
}
Branching
Route records to different output topics based on a predicate:
Map<String, KStream<String, OrderPlacedEvent>> branches = orders
.split(Named.as("region-"))
.branch((key, event) -> "EU".equals(event.getRegion()),
Branched.as("eu"))
.branch((key, event) -> "US".equals(event.getRegion()),
Branched.as("us"))
.defaultBranch(Branched.as("other"));
branches.get("region-eu").to("orders-eu");
branches.get("region-us").to("orders-us");
branches.get("region-other").to("orders-other");
Stateful Processing — Running Revenue by Customer
flowchart LR
src["orders\nKStream"]
group["groupBy(customerId)"]
agg["aggregate()\ncustomer revenue total"]
store["State Store\n(RocksDB)"]
sink["customer-revenue\nKTable<String, Double>"]
src --> group --> agg --> store --> sink
@Bean
public KTable<String, Double> customerRevenue(StreamsBuilder builder) {
KStream<String, OrderPlacedEvent> orders = builder.stream("orders",
Consumed.with(Serdes.String(), new JsonSerde<>(OrderPlacedEvent.class)));
return orders
.groupBy(
(key, event) -> event.getCustomerId(),
Grouped.with(Serdes.String(), new JsonSerde<>(OrderPlacedEvent.class))
)
.aggregate(
() -> 0.0, // initializer
(customerId, event, total) -> total + event.getTotalAmount(), // aggregator
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-revenue-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Double())
);
}
The Materialized state store persists to RocksDB locally and is backed by a Kafka changelog topic for fault tolerance.
Windowed Aggregation — Orders Per Minute
@Bean
public KTable<Windowed<String>, Long> ordersPerMinute(StreamsBuilder builder) {
KStream<String, OrderPlacedEvent> orders = builder.stream("orders",
Consumed.with(Serdes.String(), new JsonSerde<>(OrderPlacedEvent.class)));
return orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count(Materialized.as("orders-per-minute-store"));
}
KStream-KTable Join — Enriching Orders with Customer Data
@Bean
public KStream<String, EnrichedOrder> enrichedOrders(StreamsBuilder builder) {
KStream<String, OrderPlacedEvent> orders = builder.stream("orders",
Consumed.with(Serdes.String(), new JsonSerde<>(OrderPlacedEvent.class)));
KTable<String, CustomerProfile> customers = builder.table("customer-profiles",
Consumed.with(Serdes.String(), new JsonSerde<>(CustomerProfile.class)));
return orders.join(
customers,
(order, customer) -> new EnrichedOrder(order, customer),
Joined.with(
Serdes.String(),
new JsonSerde<>(OrderPlacedEvent.class),
new JsonSerde<>(CustomerProfile.class)
)
);
}
KStream-KTable join: for each order record, look up the customer profile by key. The join is driven by the stream side — if no matching KTable record exists, the join produces no output.
Querying State Stores
Read aggregated state from outside the topology (e.g., in a REST endpoint):
@RestController
@RequiredArgsConstructor
public class RevenueController {
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
@GetMapping("/revenue/{customerId}")
public Double getRevenue(@PathVariable String customerId) {
KafkaStreams streams = streamsBuilderFactoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<String, Double> store = streams.store(
StoreQueryParameters.fromNameAndType(
"customer-revenue-store",
QueryableStoreTypes.keyValueStore()
)
);
Double revenue = store.get(customerId);
return revenue != null ? revenue : 0.0;
}
}
Topology Overview
flowchart TD
subgraph Topology["Order Analytics Topology"]
I1["orders\n(source)"]
F1["filter(region=EU)"]
M1["mapValues(→AnalyticsEvent)"]
O1["orders-analytics\n(sink)"]
I1 --> F1 --> M1 --> O1
G1["groupBy(customerId)"]
A1["aggregate(revenue total)"]
S1["customer-revenue-store\n(RocksDB + changelog)"]
I1 --> G1 --> A1 --> S1
end
Key Takeaways
@EnableKafkaStreams+ aStreamsBuilder@Beanis all the wiring needed — Spring Boot auto-configures theKafkaStreamsinstance- Stateless operations (
filter,map,flatMap) transform records without state — no external storage needed - Stateful aggregations use local RocksDB state stores backed by Kafka changelog topics for fault tolerance
KTablerepresents a compacted view of a topic — join it with aKStreamto enrich records- Query state stores via
StreamsBuilderFactoryBean.getKafkaStreams()from REST endpoints application-iddetermines the consumer group and changelog topic prefix — each unique application-id creates its own state
Next: KafkaAdmin and AdminClient: Managing Topics Programmatically — create, modify, and delete Kafka topics from Spring Boot using KafkaAdmin and AdminClient.