Kafka Streams with Spring Boot: Stateless and Stateful Processing

Kafka Streams vs @KafkaListener

@KafkaListener is a consumer — it reads records and processes them one by one or in batches. Kafka Streams is a stream processing library — it builds a topology of transformations that runs continuously, with built-in state stores, windowed aggregations, and join operations.

Aspect@KafkaListenerKafka Streams
Processing modelConsume and processTopology of operators
Stateful processingManual (external DB)Built-in state stores (RocksDB)
Windowed aggregationsManualNative (time, session, hopping)
JoinsManualKStream-KTable, KStream-KStream
Fault toleranceCommitted offsetsChangelog topics + offsets
Use whenImperative event handlingStream transformations and aggregations

Maven Dependency

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Auto-Configuration

Add @EnableKafkaStreams and configure application.properties:

@SpringBootApplication
@EnableKafkaStreams
public class OrderAnalyticsApplication { }
spring.application.name=order-analytics
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=order-analytics
spring.kafka.streams.default-key-serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.default-value-serde=org.springframework.kafka.support.serializer.JsonSerde
spring.kafka.streams.replication-factor=1

application-id must be unique per application — it determines the consumer group and changelog topic prefix.


Topology Building with StreamsBuilder

Spring auto-creates a StreamsBuilder bean. Declare your topology in a @Bean method returning KStream or KTable:

@Configuration
public class OrderStreamTopology {

    @Bean
    public KStream<String, OrderPlacedEvent> orderStream(StreamsBuilder builder) {
        return builder.stream("orders");
    }
}

Stateless Processing — Filter and Map

flowchart LR
    src["orders\nKStream<String, OrderPlacedEvent>"]
    filter["filter()\nonly EU orders"]
    map["mapValues()\nOrderPlacedEvent → AnalyticsEvent"]
    sink["orders-analytics"]

    src --> filter --> map --> sink
@Bean
public KStream<String, OrderPlacedEvent> orderStream(StreamsBuilder builder) {
    KStream<String, OrderPlacedEvent> orders = builder.stream("orders",
        Consumed.with(Serdes.String(), new JsonSerde<>(OrderPlacedEvent.class)));

    orders
        .filter((key, event) -> "EU".equals(event.getRegion()))
        .mapValues(event -> new AnalyticsEvent(
            event.getOrderId(), event.getTotalAmount(), event.getPlacedAt()
        ))
        .to("orders-analytics",
            Produced.with(Serdes.String(), new JsonSerde<>(AnalyticsEvent.class)));

    return orders;
}

Branching

Route records to different output topics based on a predicate:

Map<String, KStream<String, OrderPlacedEvent>> branches = orders
    .split(Named.as("region-"))
    .branch((key, event) -> "EU".equals(event.getRegion()),
            Branched.as("eu"))
    .branch((key, event) -> "US".equals(event.getRegion()),
            Branched.as("us"))
    .defaultBranch(Branched.as("other"));

branches.get("region-eu").to("orders-eu");
branches.get("region-us").to("orders-us");
branches.get("region-other").to("orders-other");

Stateful Processing — Running Revenue by Customer

flowchart LR
    src["orders\nKStream"]
    group["groupBy(customerId)"]
    agg["aggregate()\ncustomer revenue total"]
    store["State Store\n(RocksDB)"]
    sink["customer-revenue\nKTable<String, Double>"]

    src --> group --> agg --> store --> sink
@Bean
public KTable<String, Double> customerRevenue(StreamsBuilder builder) {
    KStream<String, OrderPlacedEvent> orders = builder.stream("orders",
        Consumed.with(Serdes.String(), new JsonSerde<>(OrderPlacedEvent.class)));

    return orders
        .groupBy(
            (key, event) -> event.getCustomerId(),
            Grouped.with(Serdes.String(), new JsonSerde<>(OrderPlacedEvent.class))
        )
        .aggregate(
            () -> 0.0,                                    // initializer
            (customerId, event, total) -> total + event.getTotalAmount(),  // aggregator
            Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-revenue-store")
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.Double())
        );
}

The Materialized state store persists to RocksDB locally and is backed by a Kafka changelog topic for fault tolerance.


Windowed Aggregation — Orders Per Minute

@Bean
public KTable<Windowed<String>, Long> ordersPerMinute(StreamsBuilder builder) {
    KStream<String, OrderPlacedEvent> orders = builder.stream("orders",
        Consumed.with(Serdes.String(), new JsonSerde<>(OrderPlacedEvent.class)));

    return orders
        .groupByKey()
        .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
        .count(Materialized.as("orders-per-minute-store"));
}

KStream-KTable Join — Enriching Orders with Customer Data

@Bean
public KStream<String, EnrichedOrder> enrichedOrders(StreamsBuilder builder) {
    KStream<String, OrderPlacedEvent> orders = builder.stream("orders",
        Consumed.with(Serdes.String(), new JsonSerde<>(OrderPlacedEvent.class)));

    KTable<String, CustomerProfile> customers = builder.table("customer-profiles",
        Consumed.with(Serdes.String(), new JsonSerde<>(CustomerProfile.class)));

    return orders.join(
        customers,
        (order, customer) -> new EnrichedOrder(order, customer),
        Joined.with(
            Serdes.String(),
            new JsonSerde<>(OrderPlacedEvent.class),
            new JsonSerde<>(CustomerProfile.class)
        )
    );
}

KStream-KTable join: for each order record, look up the customer profile by key. The join is driven by the stream side — if no matching KTable record exists, the join produces no output.


Querying State Stores

Read aggregated state from outside the topology (e.g., in a REST endpoint):

@RestController
@RequiredArgsConstructor
public class RevenueController {

    private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;

    @GetMapping("/revenue/{customerId}")
    public Double getRevenue(@PathVariable String customerId) {
        KafkaStreams streams = streamsBuilderFactoryBean.getKafkaStreams();

        ReadOnlyKeyValueStore<String, Double> store = streams.store(
            StoreQueryParameters.fromNameAndType(
                "customer-revenue-store",
                QueryableStoreTypes.keyValueStore()
            )
        );

        Double revenue = store.get(customerId);
        return revenue != null ? revenue : 0.0;
    }
}

Topology Overview

flowchart TD
    subgraph Topology["Order Analytics Topology"]
        I1["orders\n(source)"]
        F1["filter(region=EU)"]
        M1["mapValues(→AnalyticsEvent)"]
        O1["orders-analytics\n(sink)"]

        I1 --> F1 --> M1 --> O1

        G1["groupBy(customerId)"]
        A1["aggregate(revenue total)"]
        S1["customer-revenue-store\n(RocksDB + changelog)"]

        I1 --> G1 --> A1 --> S1
    end

Key Takeaways

  • @EnableKafkaStreams + a StreamsBuilder @Bean is all the wiring needed — Spring Boot auto-configures the KafkaStreams instance
  • Stateless operations (filter, map, flatMap) transform records without state — no external storage needed
  • Stateful aggregations use local RocksDB state stores backed by Kafka changelog topics for fault tolerance
  • KTable represents a compacted view of a topic — join it with a KStream to enrich records
  • Query state stores via StreamsBuilderFactoryBean.getKafkaStreams() from REST endpoints
  • application-id determines the consumer group and changelog topic prefix — each unique application-id creates its own state

Next: KafkaAdmin and AdminClient: Managing Topics Programmatically — create, modify, and delete Kafka topics from Spring Boot using KafkaAdmin and AdminClient.