@SendTo and @KafkaHandler: Chaining Consumers and Multi-Type Dispatch

@SendTo — Chaining Listeners

@SendTo on a @KafkaListener method automatically sends the return value to another Kafka topic. This is how you build event pipelines without manually calling KafkaTemplate.send() in your listener.

flowchart LR
    T1["orders\n(OrderPlacedEvent)"]
    T2["orders-confirmed\n(OrderConfirmedEvent)"]
    T3["inventory-events\n(StockReservedEvent)"]

    T1 -->|"@KafkaListener\n@SendTo"| L1["confirmOrder()"]
    L1 --> T2
    T2 -->|"@KafkaListener\n@SendTo"| L2["reserveStock()"]
    L2 --> T3

Basic @SendTo

@KafkaListener(topics = "orders", groupId = "confirmation-service")
@SendTo("orders-confirmed")
public OrderConfirmedEvent onOrder(OrderPlacedEvent event) {
    // Return value is automatically sent to "orders-confirmed"
    return new OrderConfirmedEvent(
        event.getOrderId(),
        event.getCustomerId(),
        Instant.now().toEpochMilli()
    );
}

The return type must be serializable by the KafkaTemplate wired into the reply template. If the method returns null, nothing is sent.


Configuring the Reply Template

@SendTo uses the replyTemplate on the listener container factory. Wire the KafkaTemplate:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
        kafkaListenerContainerFactory(
                ConsumerFactory<String, Object> consumerFactory,
                KafkaTemplate<String, Object> kafkaTemplate) {

    var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
    factory.setConsumerFactory(consumerFactory);
    factory.setReplyTemplate(kafkaTemplate);
    return factory;
}

Dynamic Topic via SpEL

The destination topic can be computed at runtime:

@KafkaListener(topics = "orders")
@SendTo("!{@topicRouter.resolve(#source.value())}")
public OrderConfirmedEvent onOrder(OrderPlacedEvent event) {
    return new OrderConfirmedEvent(event.getOrderId());
}

@Component("topicRouter")
public class TopicRouter {
    public String resolve(OrderPlacedEvent event) {
        return "EU".equals(event.getRegion()) ? "orders-confirmed-eu" : "orders-confirmed";
    }
}

#source in the SpEL expression is the ConsumerRecord. #source.value() is the deserialized event.


Preserving the Correlation ID

When chaining listeners, preserve the original record’s key and headers in the reply:

@KafkaListener(topics = "orders")
@SendTo("orders-confirmed")
public Message<OrderConfirmedEvent> onOrder(
        OrderPlacedEvent event,
        @Header(KafkaHeaders.RECEIVED_KEY) String originalKey) {

    OrderConfirmedEvent confirmed = new OrderConfirmedEvent(event.getOrderId());

    return MessageBuilder.withPayload(confirmed)
        .setHeader(KafkaHeaders.KEY, originalKey)  // preserve original key
        .setHeader(KafkaHeaders.TOPIC, "orders-confirmed")
        .build();
}

Returning a Message<T> instead of T directly gives full control over headers and routing metadata.


@KafkaHandler — Multi-Type Dispatch

A single @KafkaListener class can handle multiple event types by declaring multiple @KafkaHandler methods. Spring Kafka dispatches to the right method based on the deserialized type.

@Component
@KafkaListener(topics = "order-events", groupId = "notification-service")
public class OrderEventDispatcher {

    @KafkaHandler
    public void onOrderPlaced(OrderPlacedEvent event) {
        notificationService.sendOrderConfirmation(event.getCustomerId(), event.getOrderId());
    }

    @KafkaHandler
    public void onOrderCancelled(OrderCancelledEvent event) {
        notificationService.sendCancellationNotice(event.getCustomerId(), event.getOrderId());
    }

    @KafkaHandler
    public void onOrderShipped(OrderShippedEvent event) {
        notificationService.sendShippingUpdate(event.getCustomerId(), event.getTrackingId());
    }

    @KafkaHandler(isDefault = true)
    public void onUnknown(Object event) {
        log.warn("Unknown event type: {}", event.getClass().getName());
    }
}

isDefault = true catches any event type that doesn’t match a specific handler. Without a default handler, an unmatched type throws a ListenerExecutionFailedException.


Type Resolution for @KafkaHandler

Spring Kafka uses the deserialized value’s runtime type to select the handler. The ConsumerFactory must deserialize to the base type or Object — use a polymorphic deserializer:

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.events");
    // No VALUE_DEFAULT_TYPE — let the __TypeId__ header determine the type
    return new DefaultKafkaConsumerFactory<>(props);
}

The producer must set ADD_TYPE_INFO_HEADERS = true (default) so the __TypeId__ header is present for dispatch.


@SendTo Combined with @KafkaHandler

Each handler method can forward its result to a different output topic:

@Component
@KafkaListener(topics = "order-events", groupId = "pipeline-service")
public class OrderPipelineProcessor {

    @KafkaHandler
    @SendTo("orders-confirmed")
    public OrderConfirmedEvent onPlaced(OrderPlacedEvent event) {
        return new OrderConfirmedEvent(event.getOrderId());
    }

    @KafkaHandler
    @SendTo("inventory-releases")
    public StockReleasedEvent onCancelled(OrderCancelledEvent event) {
        return new StockReleasedEvent(event.getOrderId(), event.getItems());
    }
}

@SendTo vs Manual KafkaTemplate.send()

Aspect@SendToKafkaTemplate.send()
CodeReturn value, zero boilerplateExplicit call, full control
HeadersCopied from source automaticallyManual
TopicStatic or SpELFull runtime flexibility
TransactionsParticipates in container transactionMust manage manually
Error handlingUses container error handlerMust handle exception yourself

Use @SendTo for linear pipelines. Use KafkaTemplate.send() when you need to send to multiple topics, conditional sends, or fine-grained header control.


Key Takeaways

  • @SendTo("topicName") on a @KafkaListener method sends the return value to the specified topic — wire setReplyTemplate(kafkaTemplate) on the factory
  • Return null to suppress the reply without an error
  • Return Message<T> instead of T to control headers and the destination topic per-record via SpEL
  • @KafkaListener on a class + @KafkaHandler on methods enables type-based dispatch — declare a default handler for unknown types
  • Type dispatch depends on the __TypeId__ header — ensure the producer adds type headers and the consumer trusts the package
  • @SendTo participates in the container’s transaction automatically; KafkaTemplate.send() requires manual transaction management

Next: Request-Reply Pattern with ReplyingKafkaTemplate — implement synchronous request-reply over Kafka using ReplyingKafkaTemplate and @SendTo.