Request-Reply Pattern with ReplyingKafkaTemplate
When Kafka Needs to Be Synchronous
Kafka is designed for asynchronous event streaming. But some flows genuinely need a response: a payment validation service that must confirm before the order proceeds, or a pricing engine that must return the current price before checkout completes. ReplyingKafkaTemplate gives you a blocking send-and-receive call over Kafka without leaving the Kafka ecosystem.
How Request-Reply Works
sequenceDiagram
participant Requester as "Order Service\n(ReplyingKafkaTemplate)"
participant Broker
participant Replier as "Pricing Service\n(@KafkaListener + @SendTo)"
Requester->>Broker: send to "pricing-requests"\nheader: kafka_correlationId=UUID\nheader: kafka_replyTopic=pricing-replies
Broker-->>Replier: deliver request
Replier->>Broker: send reply to "pricing-replies"\nheader: kafka_correlationId=UUID (echoed)
Broker-->>Requester: deliver reply matching correlationId
Requester->>Requester: CompletableFuture resolved
The correlation ID links request to reply. ReplyingKafkaTemplate manages a map of in-flight correlation IDs internally — each sendAndReceive() call adds an entry and completes when the matching reply arrives.
Dependencies
No additional dependencies — ReplyingKafkaTemplate is included in spring-kafka.
Requester Configuration
@Configuration
public class RequestReplyConfig {
@Bean
public ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> replyingKafkaTemplate(
ProducerFactory<String, PriceRequest> producerFactory,
ConcurrentMessageListenerContainer<String, PriceResponse> replyContainer) {
ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> template =
new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
template.setDefaultReplyTimeout(Duration.ofSeconds(10));
template.setSharedReplyTopic(false); // each instance has its own reply partition
return template;
}
@Bean
public ConcurrentMessageListenerContainer<String, PriceResponse> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, PriceResponse> factory) {
ConcurrentMessageListenerContainer<String, PriceResponse> container =
factory.createContainer("pricing-replies");
container.getContainerProperties().setGroupId("order-service-replies");
container.setAutoStartup(false); // ReplyingKafkaTemplate starts it
return container;
}
}
Sending a Request and Awaiting the Reply
@Service
@RequiredArgsConstructor
public class PricingClient {
private final ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> template;
public PriceResponse getPrice(String productId, int quantity) {
PriceRequest request = new PriceRequest(productId, quantity);
ProducerRecord<String, PriceRequest> record =
new ProducerRecord<>("pricing-requests", productId, request);
RequestReplyFuture<String, PriceRequest, PriceResponse> future =
template.sendAndReceive(record);
try {
ConsumerRecord<String, PriceResponse> reply = future.get(10, TimeUnit.SECONDS);
return reply.value();
} catch (TimeoutException e) {
throw new PricingServiceTimeoutException("Pricing service did not respond");
} catch (ExecutionException | InterruptedException e) {
throw new PricingServiceException("Failed to get price", e);
}
}
}
sendAndReceive() returns a RequestReplyFuture — a CompletableFuture<ConsumerRecord<K, V>> that resolves when the matching reply arrives.
Replier Configuration
The replier is a standard @KafkaListener with @SendTo — it reads kafka_replyTopic from the request headers and sends the response there:
@Component
public class PricingService {
@KafkaListener(topics = "pricing-requests", groupId = "pricing-service")
@SendTo // sends reply to the topic in the kafka_replyTopic header
public PriceResponse onPriceRequest(PriceRequest request) {
double price = pricingEngine.calculate(request.getProductId(), request.getQuantity());
return new PriceResponse(request.getProductId(), price);
}
}
@SendTo with no topic argument reads the kafka_replyTopic header from the incoming request and sends the response there. The kafka_correlationId header is automatically echoed back.
Shared vs Per-Instance Reply Topics
flowchart TD
subgraph Shared["Shared Reply Topic"]
SR["pricing-replies (3 partitions)"]
I1["Instance 1\n→ partition 0"]
I2["Instance 2\n→ partition 1"]
I3["Instance 3\n→ partition 2"]
SR --> I1
SR --> I2
SR --> I3
end
subgraph PerInstance["Per-Instance Reply Topics"]
PI1["pricing-replies-instance-1"]
PI2["pricing-replies-instance-2"]
I4["Instance 1"] --> PI1
I5["Instance 2"] --> PI2
end
Shared reply topic — all instances share one topic. Each instance consumes only from its assigned partitions. Simpler; requires the topic to have at least as many partitions as instances.
Per-instance reply topic — each instance creates its own reply topic (or uses a unique partition). More isolation but more topic management overhead.
Configure which mode:
// Use a specific partition (instance 0 = partition 0)
template.setSharedReplyTopic(true);
// Or assign a unique reply topic per instance
record.headers().add(KafkaHeaders.REPLY_TOPIC,
"pricing-replies-instance-1".getBytes());
Timeout and Error Handling
template.setDefaultReplyTimeout(Duration.ofSeconds(5));
// Per-request timeout override:
ProducerRecord<String, PriceRequest> record = new ProducerRecord<>(...);
RequestReplyFuture<String, PriceRequest, PriceResponse> future =
template.sendAndReceive(record, Duration.ofSeconds(2));
On timeout, future.get() throws ExecutionException wrapping a KafkaReplyTimeoutException.
Checking the Reply for Errors
If the replier throws an exception, Spring Kafka can propagate it back via a header:
ConsumerRecord<String, PriceResponse> reply = future.get(10, TimeUnit.SECONDS);
// Check for error header (set by replier's error handler)
Header errorHeader = reply.headers().lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN);
if (errorHeader != null) {
throw new PricingServiceException(
"Replier error: " + new String(errorHeader.value()));
}
Limitations of Request-Reply Over Kafka
| Concern | Detail |
|---|---|
| Latency | Round-trip through broker adds 5–50ms minimum — not suitable for sub-millisecond SLAs |
| Coupling | Creates temporal coupling — requester blocks while replier processes |
| Scalability | Each in-flight request holds a thread (unless using async CompletableFuture) |
| Failure modes | Replier crash, network partition, and timeout all produce the same TimeoutException |
Use request-reply over Kafka when you want the operational simplicity of a single messaging infrastructure and can tolerate Kafka-level latency. Use REST or gRPC for low-latency synchronous calls.
Key Takeaways
ReplyingKafkaTemplate.sendAndReceive()sends a request record and blocks until a reply arrives on the reply topic with a matchingkafka_correlationId- The replier uses
@KafkaListener+@SendTo(no topic argument) — Spring automatically readskafka_replyTopicfrom the request headers - Configure
setDefaultReplyTimeout()and always handleTimeoutException— the replier may be down or overloaded - Use a shared reply topic for simplicity; use per-instance topics for strict isolation
- Request-reply over Kafka is convenient but adds Kafka-level latency — evaluate whether REST or gRPC is more appropriate for your SLA
Next: Dynamic Listener Containers and Programmatic Topic Registration — create, start, and stop listener containers at runtime without predefined @KafkaListener annotations.