Request-Reply Pattern with ReplyingKafkaTemplate

When Kafka Needs to Be Synchronous

Kafka is designed for asynchronous event streaming. But some flows genuinely need a response: a payment validation service that must confirm before the order proceeds, or a pricing engine that must return the current price before checkout completes. ReplyingKafkaTemplate gives you a blocking send-and-receive call over Kafka without leaving the Kafka ecosystem.


How Request-Reply Works

sequenceDiagram
    participant Requester as "Order Service\n(ReplyingKafkaTemplate)"
    participant Broker
    participant Replier as "Pricing Service\n(@KafkaListener + @SendTo)"

    Requester->>Broker: send to "pricing-requests"\nheader: kafka_correlationId=UUID\nheader: kafka_replyTopic=pricing-replies
    Broker-->>Replier: deliver request
    Replier->>Broker: send reply to "pricing-replies"\nheader: kafka_correlationId=UUID (echoed)
    Broker-->>Requester: deliver reply matching correlationId
    Requester->>Requester: CompletableFuture resolved

The correlation ID links request to reply. ReplyingKafkaTemplate manages a map of in-flight correlation IDs internally — each sendAndReceive() call adds an entry and completes when the matching reply arrives.


Dependencies

No additional dependencies — ReplyingKafkaTemplate is included in spring-kafka.


Requester Configuration

@Configuration
public class RequestReplyConfig {

    @Bean
    public ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> replyingKafkaTemplate(
            ProducerFactory<String, PriceRequest> producerFactory,
            ConcurrentMessageListenerContainer<String, PriceResponse> replyContainer) {

        ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> template =
            new ReplyingKafkaTemplate<>(producerFactory, replyContainer);

        template.setDefaultReplyTimeout(Duration.ofSeconds(10));
        template.setSharedReplyTopic(false);  // each instance has its own reply partition
        return template;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, PriceResponse> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, PriceResponse> factory) {

        ConcurrentMessageListenerContainer<String, PriceResponse> container =
            factory.createContainer("pricing-replies");
        container.getContainerProperties().setGroupId("order-service-replies");
        container.setAutoStartup(false);  // ReplyingKafkaTemplate starts it
        return container;
    }
}

Sending a Request and Awaiting the Reply

@Service
@RequiredArgsConstructor
public class PricingClient {

    private final ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> template;

    public PriceResponse getPrice(String productId, int quantity) {
        PriceRequest request = new PriceRequest(productId, quantity);

        ProducerRecord<String, PriceRequest> record =
            new ProducerRecord<>("pricing-requests", productId, request);

        RequestReplyFuture<String, PriceRequest, PriceResponse> future =
            template.sendAndReceive(record);

        try {
            ConsumerRecord<String, PriceResponse> reply = future.get(10, TimeUnit.SECONDS);
            return reply.value();
        } catch (TimeoutException e) {
            throw new PricingServiceTimeoutException("Pricing service did not respond");
        } catch (ExecutionException | InterruptedException e) {
            throw new PricingServiceException("Failed to get price", e);
        }
    }
}

sendAndReceive() returns a RequestReplyFuture — a CompletableFuture<ConsumerRecord<K, V>> that resolves when the matching reply arrives.


Replier Configuration

The replier is a standard @KafkaListener with @SendTo — it reads kafka_replyTopic from the request headers and sends the response there:

@Component
public class PricingService {

    @KafkaListener(topics = "pricing-requests", groupId = "pricing-service")
    @SendTo  // sends reply to the topic in the kafka_replyTopic header
    public PriceResponse onPriceRequest(PriceRequest request) {
        double price = pricingEngine.calculate(request.getProductId(), request.getQuantity());
        return new PriceResponse(request.getProductId(), price);
    }
}

@SendTo with no topic argument reads the kafka_replyTopic header from the incoming request and sends the response there. The kafka_correlationId header is automatically echoed back.


Shared vs Per-Instance Reply Topics

flowchart TD
    subgraph Shared["Shared Reply Topic"]
        SR["pricing-replies (3 partitions)"]
        I1["Instance 1\n→ partition 0"]
        I2["Instance 2\n→ partition 1"]
        I3["Instance 3\n→ partition 2"]
        SR --> I1
        SR --> I2
        SR --> I3
    end

    subgraph PerInstance["Per-Instance Reply Topics"]
        PI1["pricing-replies-instance-1"]
        PI2["pricing-replies-instance-2"]
        I4["Instance 1"] --> PI1
        I5["Instance 2"] --> PI2
    end

Shared reply topic — all instances share one topic. Each instance consumes only from its assigned partitions. Simpler; requires the topic to have at least as many partitions as instances.

Per-instance reply topic — each instance creates its own reply topic (or uses a unique partition). More isolation but more topic management overhead.

Configure which mode:

// Use a specific partition (instance 0 = partition 0)
template.setSharedReplyTopic(true);

// Or assign a unique reply topic per instance
record.headers().add(KafkaHeaders.REPLY_TOPIC,
    "pricing-replies-instance-1".getBytes());

Timeout and Error Handling

template.setDefaultReplyTimeout(Duration.ofSeconds(5));

// Per-request timeout override:
ProducerRecord<String, PriceRequest> record = new ProducerRecord<>(...);
RequestReplyFuture<String, PriceRequest, PriceResponse> future =
    template.sendAndReceive(record, Duration.ofSeconds(2));

On timeout, future.get() throws ExecutionException wrapping a KafkaReplyTimeoutException.


Checking the Reply for Errors

If the replier throws an exception, Spring Kafka can propagate it back via a header:

ConsumerRecord<String, PriceResponse> reply = future.get(10, TimeUnit.SECONDS);

// Check for error header (set by replier's error handler)
Header errorHeader = reply.headers().lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN);
if (errorHeader != null) {
    throw new PricingServiceException(
        "Replier error: " + new String(errorHeader.value()));
}

Limitations of Request-Reply Over Kafka

ConcernDetail
LatencyRound-trip through broker adds 5–50ms minimum — not suitable for sub-millisecond SLAs
CouplingCreates temporal coupling — requester blocks while replier processes
ScalabilityEach in-flight request holds a thread (unless using async CompletableFuture)
Failure modesReplier crash, network partition, and timeout all produce the same TimeoutException

Use request-reply over Kafka when you want the operational simplicity of a single messaging infrastructure and can tolerate Kafka-level latency. Use REST or gRPC for low-latency synchronous calls.


Key Takeaways

  • ReplyingKafkaTemplate.sendAndReceive() sends a request record and blocks until a reply arrives on the reply topic with a matching kafka_correlationId
  • The replier uses @KafkaListener + @SendTo (no topic argument) — Spring automatically reads kafka_replyTopic from the request headers
  • Configure setDefaultReplyTimeout() and always handle TimeoutException — the replier may be down or overloaded
  • Use a shared reply topic for simplicity; use per-instance topics for strict isolation
  • Request-reply over Kafka is convenient but adds Kafka-level latency — evaluate whether REST or gRPC is more appropriate for your SLA

Next: Dynamic Listener Containers and Programmatic Topic Registration — create, start, and stop listener containers at runtime without predefined @KafkaListener annotations.