Part 43 of 59
Producing and Consuming Kafka Messages
This article implements Kafka producers and consumers with the full production setup — error handling, retries, dead-letter topics, and idempotent consumers.
Producer: KafkaTemplate
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void publishOrderCreated(Order order) {
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getCustomerEmail(),
order.getItems().stream().map(OrderItemDto::from).toList(),
order.getTotalAmount(),
Instant.now()
);
// Key = customerId: all events for same customer go to same partition
kafkaTemplate.send("order-events", order.getCustomerId().toString(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to publish OrderCreated event: orderId={}", order.getId(), ex);
} else {
log.info("Published OrderCreated: orderId={}, partition={}, offset={}",
order.getId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
public void publishPaymentProcessed(Payment payment) {
PaymentProcessedEvent event = new PaymentProcessedEvent(
payment.getId(),
payment.getOrderId(),
payment.getAmount(),
payment.getCurrency(),
Instant.now()
);
kafkaTemplate.send("payment-events", payment.getOrderId().toString(), event);
}
}
Calling the publisher from the service:
@Service
@RequiredArgsConstructor
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OrderEventPublisher eventPublisher;
public Order createOrder(CreateOrderRequest request) {
Order order = buildOrder(request);
order = orderRepository.save(order);
eventPublisher.publishOrderCreated(order);
// ⚠️ Problem: if Kafka publish fails, the order is committed to DB
// but the event is lost. See Article 44 for the fix (Outbox Pattern).
return order;
}
}
Producer Configuration
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Reliability settings
config.put(ProducerConfig.ACKS_CONFIG, "all"); // wait for all replicas
config.put(ProducerConfig.RETRIES_CONFIG, 10);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // exactly-once from producer
// Batching (higher throughput)
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
config.put(ProducerConfig.LINGER_MS_CONFIG, 5); // wait 5ms to batch
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Consumer: @KafkaListener
@Component
@Slf4j
public class OrderEventConsumer {
private final InventoryService inventoryService;
private final ProcessedEventRepository processedEventRepository;
@KafkaListener(
topics = "order-events",
groupId = "inventory-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void onOrderCreated(
@Payload OrderCreatedEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack) {
log.info("Received OrderCreated: orderId={}, partition={}, offset={}",
event.orderId(), partition, offset);
try {
processEvent(event);
ack.acknowledge(); // commit offset only after successful processing
} catch (Exception e) {
log.error("Failed to process OrderCreated: orderId={}", event.orderId(), e);
// Do NOT acknowledge — message will be retried
throw e; // let the error handler deal with it
}
}
private void processEvent(OrderCreatedEvent event) {
// Idempotency check — don't process the same event twice
if (processedEventRepository.existsByEventId(event.orderId().toString())) {
log.info("Skipping duplicate event: orderId={}", event.orderId());
return;
}
inventoryService.reserveItems(event.orderId(), event.items());
processedEventRepository.save(new ProcessedEvent(
event.orderId().toString(), "OrderCreated", Instant.now()));
}
}
Consumer Configuration with Error Handling
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "inventory-service");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.devopsmonk.*");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory,
KafkaTemplate<String, Object> kafkaTemplate) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3); // 3 consumer threads per listener
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// Retry with exponential backoff, then dead-letter
factory.setCommonErrorHandler(errorHandler(kafkaTemplate));
return factory;
}
private DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
// Dead-letter publisher
var dlqPublisher = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()));
// Exponential backoff: 1s, 2s, 4s, 8s — then send to DLT
var backoff = new ExponentialBackOff(1000L, 2.0);
backoff.setMaxElapsedTime(30000L); // give up after 30s total
var errorHandler = new DefaultErrorHandler(dlqPublisher, backoff);
// Don't retry these — they'll never succeed
errorHandler.addNotRetryableExceptions(
JsonParseException.class,
DeserializationException.class
);
return errorHandler;
}
}
Dead-Letter Topic Consumer
Events that fail all retries land in the .DLT topic:
@Component
@Slf4j
public class DeadLetterConsumer {
private final AlertService alertService;
private final FailedEventRepository failedEventRepository;
@KafkaListener(topics = "order-events.DLT", groupId = "dlt-handler")
public void handleDeadLetter(
ConsumerRecord<String, Object> record,
@Header(KafkaHeaders.EXCEPTION_FQCN) String exceptionClass,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage) {
log.error("Dead letter received: topic={}, key={}, exception={}: {}",
record.topic(), record.key(), exceptionClass, exceptionMessage);
// Store for manual inspection/reprocessing
failedEventRepository.save(new FailedEvent(
record.topic(),
record.key(),
record.value().toString(),
exceptionClass,
exceptionMessage,
Instant.now()
));
// Alert the on-call engineer
alertService.sendAlert("Dead letter: " + exceptionClass + " processing " + record.key());
}
}
Multiple Event Types on One Topic
@Component
public class OrderEventConsumer {
@KafkaListener(topics = "order-events", groupId = "notification-service")
public void handle(ConsumerRecord<String, Object> record, Acknowledgment ack) {
Object value = record.value();
if (value instanceof OrderCreatedEvent event) {
sendOrderConfirmation(event);
} else if (value instanceof OrderShippedEvent event) {
sendShippingNotification(event);
} else if (value instanceof OrderCancelledEvent event) {
sendCancellationNotification(event);
}
ack.acknowledge();
}
}
Configure the deserializer to map types:
spring:
kafka:
consumer:
properties:
spring.json.value.default.type: java.lang.Object
spring.json.use.type.headers: true
Parallel Consumer with Partitioning
@KafkaListener(
topics = "order-events",
groupId = "analytics-service",
concurrency = "6" // 6 threads, one per partition
)
public void handleOrderEvent(OrderCreatedEvent event, Acknowledgment ack) {
analyticsService.recordOrderEvent(event);
ack.acknowledge();
}
Each thread handles one partition. Set concurrency ≤ partition count.
Testing Kafka
Use @EmbeddedKafka for unit/integration tests:
@SpringBootTest
@EmbeddedKafka(
partitions = 1,
topics = {"order-events"},
brokerProperties = {"listeners=PLAINTEXT://localhost:9092"}
)
class OrderEventProducerTest {
@Autowired KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
@Qualifier("order-events")
KafkaMessageListenerContainer<String, Object> container;
private BlockingQueue<ConsumerRecord<String, Object>> records = new LinkedBlockingQueue<>();
@BeforeEach
void setup() {
ContainerTestUtils.waitForAssignment(container, 1);
}
@Test
void publishesOrderCreatedEvent() throws InterruptedException {
kafkaTemplate.send("order-events", "key-1", new OrderCreatedEvent(
UUID.randomUUID(), UUID.randomUUID(), "alice@example.com",
List.of(), BigDecimal.TEN, Instant.now()
));
ConsumerRecord<String, Object> record = records.poll(5, TimeUnit.SECONDS);
assertThat(record).isNotNull();
assertThat(record.value()).isInstanceOf(OrderCreatedEvent.class);
}
}
Or use Testcontainers with a real Kafka:
@SpringBootTest
@Testcontainers
class KafkaIntegrationTest {
@Container
@ServiceConnection
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));
// Full Kafka available — no mocking
}
What You’ve Learned
KafkaTemplate.send(topic, key, value)publishes events — key determines which partition@KafkaListenersubscribes to a topic; useAcknowledgment.acknowledge()to commit offsets manuallyDefaultErrorHandlerwithExponentialBackOffretries failed messages, then sends to.DLT- Dead-letter consumers handle persistently-failing events — store for inspection, alert on-call
- Idempotent consumers check for duplicate events before processing — required for at-least-once delivery
@EmbeddedKafkaorKafkaContainer(Testcontainers) enable reliable Kafka integration tests
Next: Article 44 — Reliable Event Publishing: The Transactional Outbox Pattern — guarantee events are published even when the application crashes mid-transaction.