Kafka Producer in Spring Boot: KafkaTemplate Basics

How a Spring Kafka Producer Works

KafkaTemplate is the central Spring Kafka class for sending messages. It wraps the native Kafka KafkaProducer, manages serialization, and provides a Spring-friendly API for sending records.

flowchart LR
    App["Your Service\n(OrderService)"]
    KT["KafkaTemplate\n(Spring Kafka)"]
    Buffer["Producer Buffer\n(RecordAccumulator)"]
    Sender["Sender Thread\n(NetworkClient)"]
    Broker["Kafka Broker\n(Leader Partition)"]

    App -->|"send(topic, key, value)"| KT
    KT -->|serialize + route| Buffer
    Buffer -->|batch when full\nor linger.ms elapsed| Sender
    Sender -->|ProduceRequest| Broker
    Broker -->|ProduceResponse| Sender
    Sender -->|callback| App

The send is asynchronous by defaultKafkaTemplate.send() returns immediately after placing the record in the producer’s internal buffer. The buffer is flushed to the broker in the background. The result (success or failure) is delivered via a callback or CompletableFuture.


Maven Dependency

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Spring Boot’s auto-configuration handles KafkaTemplate bean creation automatically when this dependency is on the classpath.


Minimal Configuration

# application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

With just these three properties, Spring Boot auto-configures a KafkaTemplate<String, String>.


Sending String Messages

@Service
public class OrderEventPublisher {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderEventPublisher(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void publishOrderPlaced(String orderId, String payload) {
        kafkaTemplate.send("orders", orderId, payload);
    }
}

The three-argument send(topic, key, value) form:

  • topic: the target topic name
  • key: used to route to a consistent partition (null = round-robin)
  • value: the message payload (serialized by the configured ValueSerializer)

Sending JSON (POJO) Messages

For production, send domain objects serialized as JSON rather than raw strings.

Configuration

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Domain Events

public record OrderPlacedEvent(
    String orderId,
    String customerId,
    BigDecimal total,
    List<OrderItem> items,
    Instant placedAt
) {}

public record OrderItem(String productId, int quantity, BigDecimal unitPrice) {}

Publisher

@Service
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

    public OrderEventPublisher(KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void publishOrderPlaced(OrderPlacedEvent event) {
        kafkaTemplate.send("orders", event.orderId(), event);
    }
}

JsonSerializer converts the OrderPlacedEvent to JSON bytes and adds a __TypeId__ header so the consumer knows what type to deserialize to.


Handling Send Results

Fire and Forget (Default)

kafkaTemplate.send("orders", orderId, event);
// Returns CompletableFuture — ignoring it means ignoring errors

Never ignore send failures in production — use a callback or await the result.

Callback-Based (Non-Blocking)

public void publishOrderPlaced(OrderPlacedEvent event) {
    CompletableFuture<SendResult<String, OrderPlacedEvent>> future =
        kafkaTemplate.send("orders", event.orderId(), event);

    future.whenComplete((result, ex) -> {
        if (ex != null) {
            log.error("Failed to send order event: orderId={}", event.orderId(), ex);
            // handle failure: retry, alert, DLQ
        } else {
            RecordMetadata metadata = result.getRecordMetadata();
            log.info("Order event sent: orderId={} partition={} offset={}",
                event.orderId(),
                metadata.partition(),
                metadata.offset());
        }
    });
}

The callback runs on the Kafka sender thread — avoid blocking operations inside it.

Awaiting the Result (Blocking)

For cases where you need to confirm delivery before proceeding:

public void publishAndConfirm(OrderPlacedEvent event) throws ExecutionException, InterruptedException {
    SendResult<String, OrderPlacedEvent> result =
        kafkaTemplate.send("orders", event.orderId(), event).get();

    log.info("Confirmed delivery: partition={} offset={}",
        result.getRecordMetadata().partition(),
        result.getRecordMetadata().offset());
}

.get() blocks the calling thread until the broker acknowledges the write. Use this sparingly — it reduces throughput significantly. Prefer the callback approach for high-volume producers.


ProducerRecord: Full Control

For maximum control over the record — specifying partition, timestamp, or headers:

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;

public void publishWithMetadata(OrderPlacedEvent event) {
    ProducerRecord<String, OrderPlacedEvent> record = new ProducerRecord<>(
        "orders",                    // topic
        null,                        // partition (null = use key-based routing)
        System.currentTimeMillis(),  // timestamp
        event.orderId(),             // key
        event,                       // value
        List.of(                     // headers
            new RecordHeader("source", "order-service".getBytes()),
            new RecordHeader("version", "1".getBytes()),
            new RecordHeader("correlationId", UUID.randomUUID().toString().getBytes())
        )
    );

    kafkaTemplate.send(new Message<>(record));
}

Or using Spring’s MessageBuilder:

import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.support.MessageBuilder;

Message<OrderPlacedEvent> message = MessageBuilder
    .withPayload(event)
    .setHeader(KafkaHeaders.TOPIC, "orders")
    .setHeader(KafkaHeaders.KEY, event.orderId())
    .setHeader("source", "order-service")
    .setHeader("correlationId", UUID.randomUUID().toString())
    .build();

kafkaTemplate.send(message);

Setting a Default Topic

If you always send to the same topic, set a default:

spring.kafka.template.default-topic=orders
// Now you can omit the topic
kafkaTemplate.sendDefault(event.orderId(), event);

The Full Order Service

Putting it together — a REST-triggered producer:

@RestController
@RequestMapping("/api/orders")
public class OrderController {

    private final OrderService orderService;
    private final OrderEventPublisher eventPublisher;

    @PostMapping
    public ResponseEntity<OrderResponse> placeOrder(@RequestBody @Valid PlaceOrderRequest request) {
        Order order = orderService.createOrder(request);

        OrderPlacedEvent event = new OrderPlacedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getTotal(),
            order.getItems().stream()
                .map(i -> new OrderItem(i.getProductId(), i.getQuantity(), i.getUnitPrice()))
                .toList(),
            Instant.now()
        );

        eventPublisher.publishOrderPlaced(event);

        return ResponseEntity.status(HttpStatus.CREATED)
            .body(new OrderResponse(order.getId(), "PENDING"));
    }
}
@Service
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

    public OrderEventPublisher(KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void publishOrderPlaced(OrderPlacedEvent event) {
        kafkaTemplate.send("orders", event.orderId(), event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("[KAFKA] Failed to publish OrderPlacedEvent: orderId={}",
                        event.orderId(), ex);
                } else {
                    log.info("[KAFKA] Published OrderPlacedEvent: orderId={} → partition={} offset={}",
                        event.orderId(),
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
                }
            });
    }
}

Producer Lifecycle

stateDiagram-v2
    [*] --> Created : KafkaTemplate initialized\n(first send or explicit init)
    Created --> Active : Producer connected to broker
    Active --> Active : Sending records (batching in buffer)
    Active --> Flushing : kafkaTemplate.flush() called
    Flushing --> Active : All buffered records sent
    Active --> Closed : Application shutdown
    Closed --> [*]

Spring Kafka manages the producer lifecycle automatically — the underlying KafkaProducer is created lazily on the first send and closed when the Spring context shuts down.

To explicitly flush all buffered records (e.g. before a scheduled shutdown):

kafkaTemplate.flush();

Verifying with CLI

After your Spring Boot app sends events, verify with the CLI consumer:

docker exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning \
  --property print.key=true \
  --property print.headers=true

You should see the JSON payload and any custom headers.


Key Takeaways

  • KafkaTemplate.send() is asynchronous — the record enters the producer buffer, not the broker
  • Use CompletableFuture.whenComplete() to handle success and failure without blocking
  • Send POJOs with JsonSerializerKafkaTemplate<String, YourEvent> and value-serializer=JsonSerializer
  • Use ProducerRecord or MessageBuilder for full control over partition, timestamp, and headers
  • Always handle send failures — a fire-and-forget send hides data loss
  • Set spring.kafka.template.default-topic if all sends go to the same topic

Next: Sending Messages with Keys, Headers, and Custom Partitioning — control exactly which partition a message lands on using keys, custom partitioners, and explicit partition assignment.