@SendTo and @KafkaHandler: Chaining Consumers and Multi-Type Dispatch
@SendTo — Chaining Listeners
@SendTo on a @KafkaListener method automatically sends the return value to another Kafka topic. This is how you build event pipelines without manually calling KafkaTemplate.send() in your listener.
flowchart LR
T1["orders\n(OrderPlacedEvent)"]
T2["orders-confirmed\n(OrderConfirmedEvent)"]
T3["inventory-events\n(StockReservedEvent)"]
T1 -->|"@KafkaListener\n@SendTo"| L1["confirmOrder()"]
L1 --> T2
T2 -->|"@KafkaListener\n@SendTo"| L2["reserveStock()"]
L2 --> T3
Basic @SendTo
@KafkaListener(topics = "orders", groupId = "confirmation-service")
@SendTo("orders-confirmed")
public OrderConfirmedEvent onOrder(OrderPlacedEvent event) {
// Return value is automatically sent to "orders-confirmed"
return new OrderConfirmedEvent(
event.getOrderId(),
event.getCustomerId(),
Instant.now().toEpochMilli()
);
}
The return type must be serializable by the KafkaTemplate wired into the reply template. If the method returns null, nothing is sent.
Configuring the Reply Template
@SendTo uses the replyTemplate on the listener container factory. Wire the KafkaTemplate:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory,
KafkaTemplate<String, Object> kafkaTemplate) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
factory.setConsumerFactory(consumerFactory);
factory.setReplyTemplate(kafkaTemplate);
return factory;
}
Dynamic Topic via SpEL
The destination topic can be computed at runtime:
@KafkaListener(topics = "orders")
@SendTo("!{@topicRouter.resolve(#source.value())}")
public OrderConfirmedEvent onOrder(OrderPlacedEvent event) {
return new OrderConfirmedEvent(event.getOrderId());
}
@Component("topicRouter")
public class TopicRouter {
public String resolve(OrderPlacedEvent event) {
return "EU".equals(event.getRegion()) ? "orders-confirmed-eu" : "orders-confirmed";
}
}
#source in the SpEL expression is the ConsumerRecord. #source.value() is the deserialized event.
Preserving the Correlation ID
When chaining listeners, preserve the original record’s key and headers in the reply:
@KafkaListener(topics = "orders")
@SendTo("orders-confirmed")
public Message<OrderConfirmedEvent> onOrder(
OrderPlacedEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String originalKey) {
OrderConfirmedEvent confirmed = new OrderConfirmedEvent(event.getOrderId());
return MessageBuilder.withPayload(confirmed)
.setHeader(KafkaHeaders.KEY, originalKey) // preserve original key
.setHeader(KafkaHeaders.TOPIC, "orders-confirmed")
.build();
}
Returning a Message<T> instead of T directly gives full control over headers and routing metadata.
@KafkaHandler — Multi-Type Dispatch
A single @KafkaListener class can handle multiple event types by declaring multiple @KafkaHandler methods. Spring Kafka dispatches to the right method based on the deserialized type.
@Component
@KafkaListener(topics = "order-events", groupId = "notification-service")
public class OrderEventDispatcher {
@KafkaHandler
public void onOrderPlaced(OrderPlacedEvent event) {
notificationService.sendOrderConfirmation(event.getCustomerId(), event.getOrderId());
}
@KafkaHandler
public void onOrderCancelled(OrderCancelledEvent event) {
notificationService.sendCancellationNotice(event.getCustomerId(), event.getOrderId());
}
@KafkaHandler
public void onOrderShipped(OrderShippedEvent event) {
notificationService.sendShippingUpdate(event.getCustomerId(), event.getTrackingId());
}
@KafkaHandler(isDefault = true)
public void onUnknown(Object event) {
log.warn("Unknown event type: {}", event.getClass().getName());
}
}
isDefault = true catches any event type that doesn’t match a specific handler. Without a default handler, an unmatched type throws a ListenerExecutionFailedException.
Type Resolution for @KafkaHandler
Spring Kafka uses the deserialized value’s runtime type to select the handler. The ConsumerFactory must deserialize to the base type or Object — use a polymorphic deserializer:
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.events");
// No VALUE_DEFAULT_TYPE — let the __TypeId__ header determine the type
return new DefaultKafkaConsumerFactory<>(props);
}
The producer must set ADD_TYPE_INFO_HEADERS = true (default) so the __TypeId__ header is present for dispatch.
@SendTo Combined with @KafkaHandler
Each handler method can forward its result to a different output topic:
@Component
@KafkaListener(topics = "order-events", groupId = "pipeline-service")
public class OrderPipelineProcessor {
@KafkaHandler
@SendTo("orders-confirmed")
public OrderConfirmedEvent onPlaced(OrderPlacedEvent event) {
return new OrderConfirmedEvent(event.getOrderId());
}
@KafkaHandler
@SendTo("inventory-releases")
public StockReleasedEvent onCancelled(OrderCancelledEvent event) {
return new StockReleasedEvent(event.getOrderId(), event.getItems());
}
}
@SendTo vs Manual KafkaTemplate.send()
| Aspect | @SendTo | KafkaTemplate.send() |
|---|---|---|
| Code | Return value, zero boilerplate | Explicit call, full control |
| Headers | Copied from source automatically | Manual |
| Topic | Static or SpEL | Full runtime flexibility |
| Transactions | Participates in container transaction | Must manage manually |
| Error handling | Uses container error handler | Must handle exception yourself |
Use @SendTo for linear pipelines. Use KafkaTemplate.send() when you need to send to multiple topics, conditional sends, or fine-grained header control.
Key Takeaways
@SendTo("topicName")on a@KafkaListenermethod sends the return value to the specified topic — wiresetReplyTemplate(kafkaTemplate)on the factory- Return
nullto suppress the reply without an error - Return
Message<T>instead ofTto control headers and the destination topic per-record via SpEL @KafkaListeneron a class +@KafkaHandleron methods enables type-based dispatch — declare a default handler for unknown types- Type dispatch depends on the
__TypeId__header — ensure the producer adds type headers and the consumer trusts the package @SendToparticipates in the container’s transaction automatically;KafkaTemplate.send()requires manual transaction management
Next: Request-Reply Pattern with ReplyingKafkaTemplate — implement synchronous request-reply over Kafka using ReplyingKafkaTemplate and @SendTo.