Kafka Producer in Spring Boot: KafkaTemplate Basics
How a Spring Kafka Producer Works
KafkaTemplate is the central Spring Kafka class for sending messages. It wraps the native Kafka KafkaProducer, manages serialization, and provides a Spring-friendly API for sending records.
flowchart LR
App["Your Service\n(OrderService)"]
KT["KafkaTemplate\n(Spring Kafka)"]
Buffer["Producer Buffer\n(RecordAccumulator)"]
Sender["Sender Thread\n(NetworkClient)"]
Broker["Kafka Broker\n(Leader Partition)"]
App -->|"send(topic, key, value)"| KT
KT -->|serialize + route| Buffer
Buffer -->|batch when full\nor linger.ms elapsed| Sender
Sender -->|ProduceRequest| Broker
Broker -->|ProduceResponse| Sender
Sender -->|callback| App
The send is asynchronous by default — KafkaTemplate.send() returns immediately after placing the record in the producer’s internal buffer. The buffer is flushed to the broker in the background. The result (success or failure) is delivered via a callback or CompletableFuture.
Maven Dependency
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Spring Boot’s auto-configuration handles KafkaTemplate bean creation automatically when this dependency is on the classpath.
Minimal Configuration
# application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
With just these three properties, Spring Boot auto-configures a KafkaTemplate<String, String>.
Sending String Messages
@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderEventPublisher(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void publishOrderPlaced(String orderId, String payload) {
kafkaTemplate.send("orders", orderId, payload);
}
}
The three-argument send(topic, key, value) form:
- topic: the target topic name
- key: used to route to a consistent partition (
null= round-robin) - value: the message payload (serialized by the configured
ValueSerializer)
Sending JSON (POJO) Messages
For production, send domain objects serialized as JSON rather than raw strings.
Configuration
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Domain Events
public record OrderPlacedEvent(
String orderId,
String customerId,
BigDecimal total,
List<OrderItem> items,
Instant placedAt
) {}
public record OrderItem(String productId, int quantity, BigDecimal unitPrice) {}
Publisher
@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
public OrderEventPublisher(KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void publishOrderPlaced(OrderPlacedEvent event) {
kafkaTemplate.send("orders", event.orderId(), event);
}
}
JsonSerializer converts the OrderPlacedEvent to JSON bytes and adds a __TypeId__ header so the consumer knows what type to deserialize to.
Handling Send Results
Fire and Forget (Default)
kafkaTemplate.send("orders", orderId, event);
// Returns CompletableFuture — ignoring it means ignoring errors
Never ignore send failures in production — use a callback or await the result.
Callback-Based (Non-Blocking)
public void publishOrderPlaced(OrderPlacedEvent event) {
CompletableFuture<SendResult<String, OrderPlacedEvent>> future =
kafkaTemplate.send("orders", event.orderId(), event);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to send order event: orderId={}", event.orderId(), ex);
// handle failure: retry, alert, DLQ
} else {
RecordMetadata metadata = result.getRecordMetadata();
log.info("Order event sent: orderId={} partition={} offset={}",
event.orderId(),
metadata.partition(),
metadata.offset());
}
});
}
The callback runs on the Kafka sender thread — avoid blocking operations inside it.
Awaiting the Result (Blocking)
For cases where you need to confirm delivery before proceeding:
public void publishAndConfirm(OrderPlacedEvent event) throws ExecutionException, InterruptedException {
SendResult<String, OrderPlacedEvent> result =
kafkaTemplate.send("orders", event.orderId(), event).get();
log.info("Confirmed delivery: partition={} offset={}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
.get() blocks the calling thread until the broker acknowledges the write. Use this sparingly — it reduces throughput significantly. Prefer the callback approach for high-volume producers.
ProducerRecord: Full Control
For maximum control over the record — specifying partition, timestamp, or headers:
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
public void publishWithMetadata(OrderPlacedEvent event) {
ProducerRecord<String, OrderPlacedEvent> record = new ProducerRecord<>(
"orders", // topic
null, // partition (null = use key-based routing)
System.currentTimeMillis(), // timestamp
event.orderId(), // key
event, // value
List.of( // headers
new RecordHeader("source", "order-service".getBytes()),
new RecordHeader("version", "1".getBytes()),
new RecordHeader("correlationId", UUID.randomUUID().toString().getBytes())
)
);
kafkaTemplate.send(new Message<>(record));
}
Or using Spring’s MessageBuilder:
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.support.MessageBuilder;
Message<OrderPlacedEvent> message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.TOPIC, "orders")
.setHeader(KafkaHeaders.KEY, event.orderId())
.setHeader("source", "order-service")
.setHeader("correlationId", UUID.randomUUID().toString())
.build();
kafkaTemplate.send(message);
Setting a Default Topic
If you always send to the same topic, set a default:
spring.kafka.template.default-topic=orders
// Now you can omit the topic
kafkaTemplate.sendDefault(event.orderId(), event);
The Full Order Service
Putting it together — a REST-triggered producer:
@RestController
@RequestMapping("/api/orders")
public class OrderController {
private final OrderService orderService;
private final OrderEventPublisher eventPublisher;
@PostMapping
public ResponseEntity<OrderResponse> placeOrder(@RequestBody @Valid PlaceOrderRequest request) {
Order order = orderService.createOrder(request);
OrderPlacedEvent event = new OrderPlacedEvent(
order.getId(),
order.getCustomerId(),
order.getTotal(),
order.getItems().stream()
.map(i -> new OrderItem(i.getProductId(), i.getQuantity(), i.getUnitPrice()))
.toList(),
Instant.now()
);
eventPublisher.publishOrderPlaced(event);
return ResponseEntity.status(HttpStatus.CREATED)
.body(new OrderResponse(order.getId(), "PENDING"));
}
}
@Service
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
public OrderEventPublisher(KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void publishOrderPlaced(OrderPlacedEvent event) {
kafkaTemplate.send("orders", event.orderId(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("[KAFKA] Failed to publish OrderPlacedEvent: orderId={}",
event.orderId(), ex);
} else {
log.info("[KAFKA] Published OrderPlacedEvent: orderId={} → partition={} offset={}",
event.orderId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
}
Producer Lifecycle
stateDiagram-v2
[*] --> Created : KafkaTemplate initialized\n(first send or explicit init)
Created --> Active : Producer connected to broker
Active --> Active : Sending records (batching in buffer)
Active --> Flushing : kafkaTemplate.flush() called
Flushing --> Active : All buffered records sent
Active --> Closed : Application shutdown
Closed --> [*]
Spring Kafka manages the producer lifecycle automatically — the underlying KafkaProducer is created lazily on the first send and closed when the Spring context shuts down.
To explicitly flush all buffered records (e.g. before a scheduled shutdown):
kafkaTemplate.flush();
Verifying with CLI
After your Spring Boot app sends events, verify with the CLI consumer:
docker exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning \
--property print.key=true \
--property print.headers=true
You should see the JSON payload and any custom headers.
Key Takeaways
KafkaTemplate.send()is asynchronous — the record enters the producer buffer, not the broker- Use
CompletableFuture.whenComplete()to handle success and failure without blocking - Send POJOs with
JsonSerializer—KafkaTemplate<String, YourEvent>andvalue-serializer=JsonSerializer - Use
ProducerRecordorMessageBuilderfor full control over partition, timestamp, and headers - Always handle send failures — a fire-and-forget send hides data loss
- Set
spring.kafka.template.default-topicif all sends go to the same topic
Next: Sending Messages with Keys, Headers, and Custom Partitioning — control exactly which partition a message lands on using keys, custom partitioners, and explicit partition assignment.