Producing and Consuming Kafka Messages

This article implements Kafka producers and consumers with the full production setup — error handling, retries, dead-letter topics, and idempotent consumers.

Producer: KafkaTemplate

@Service
@RequiredArgsConstructor
@Slf4j
public class OrderEventPublisher {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void publishOrderCreated(Order order) {
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getCustomerEmail(),
            order.getItems().stream().map(OrderItemDto::from).toList(),
            order.getTotalAmount(),
            Instant.now()
        );

        // Key = customerId: all events for same customer go to same partition
        kafkaTemplate.send("order-events", order.getCustomerId().toString(), event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to publish OrderCreated event: orderId={}", order.getId(), ex);
                } else {
                    log.info("Published OrderCreated: orderId={}, partition={}, offset={}",
                        order.getId(),
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
                }
            });
    }

    public void publishPaymentProcessed(Payment payment) {
        PaymentProcessedEvent event = new PaymentProcessedEvent(
            payment.getId(),
            payment.getOrderId(),
            payment.getAmount(),
            payment.getCurrency(),
            Instant.now()
        );

        kafkaTemplate.send("payment-events", payment.getOrderId().toString(), event);
    }
}

Calling the publisher from the service:

@Service
@RequiredArgsConstructor
@Transactional
public class OrderService {

    private final OrderRepository orderRepository;
    private final OrderEventPublisher eventPublisher;

    public Order createOrder(CreateOrderRequest request) {
        Order order = buildOrder(request);
        order = orderRepository.save(order);

        eventPublisher.publishOrderCreated(order);
        // ⚠️ Problem: if Kafka publish fails, the order is committed to DB
        // but the event is lost. See Article 44 for the fix (Outbox Pattern).

        return order;
    }
}

Producer Configuration

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // Reliability settings
        config.put(ProducerConfig.ACKS_CONFIG, "all");           // wait for all replicas
        config.put(ProducerConfig.RETRIES_CONFIG, 10);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);  // exactly-once from producer

        // Batching (higher throughput)
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        config.put(ProducerConfig.LINGER_MS_CONFIG, 5);           // wait 5ms to batch

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Consumer: @KafkaListener

@Component
@Slf4j
public class OrderEventConsumer {

    private final InventoryService inventoryService;
    private final ProcessedEventRepository processedEventRepository;

    @KafkaListener(
        topics = "order-events",
        groupId = "inventory-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void onOrderCreated(
            @Payload OrderCreatedEvent event,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment ack) {

        log.info("Received OrderCreated: orderId={}, partition={}, offset={}",
            event.orderId(), partition, offset);

        try {
            processEvent(event);
            ack.acknowledge();  // commit offset only after successful processing

        } catch (Exception e) {
            log.error("Failed to process OrderCreated: orderId={}", event.orderId(), e);
            // Do NOT acknowledge — message will be retried
            throw e;  // let the error handler deal with it
        }
    }

    private void processEvent(OrderCreatedEvent event) {
        // Idempotency check — don't process the same event twice
        if (processedEventRepository.existsByEventId(event.orderId().toString())) {
            log.info("Skipping duplicate event: orderId={}", event.orderId());
            return;
        }

        inventoryService.reserveItems(event.orderId(), event.items());

        processedEventRepository.save(new ProcessedEvent(
            event.orderId().toString(), "OrderCreated", Instant.now()));
    }
}

Consumer Configuration with Error Handling

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "inventory-service");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.devopsmonk.*");
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
            ConsumerFactory<String, Object> consumerFactory,
            KafkaTemplate<String, Object> kafkaTemplate) {

        var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3);   // 3 consumer threads per listener
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        // Retry with exponential backoff, then dead-letter
        factory.setCommonErrorHandler(errorHandler(kafkaTemplate));

        return factory;
    }

    private DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
        // Dead-letter publisher
        var dlqPublisher = new DeadLetterPublishingRecoverer(kafkaTemplate,
            (record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()));

        // Exponential backoff: 1s, 2s, 4s, 8s — then send to DLT
        var backoff = new ExponentialBackOff(1000L, 2.0);
        backoff.setMaxElapsedTime(30000L);  // give up after 30s total

        var errorHandler = new DefaultErrorHandler(dlqPublisher, backoff);

        // Don't retry these — they'll never succeed
        errorHandler.addNotRetryableExceptions(
            JsonParseException.class,
            DeserializationException.class
        );

        return errorHandler;
    }
}

Dead-Letter Topic Consumer

Events that fail all retries land in the .DLT topic:

@Component
@Slf4j
public class DeadLetterConsumer {

    private final AlertService alertService;
    private final FailedEventRepository failedEventRepository;

    @KafkaListener(topics = "order-events.DLT", groupId = "dlt-handler")
    public void handleDeadLetter(
            ConsumerRecord<String, Object> record,
            @Header(KafkaHeaders.EXCEPTION_FQCN) String exceptionClass,
            @Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage) {

        log.error("Dead letter received: topic={}, key={}, exception={}: {}",
            record.topic(), record.key(), exceptionClass, exceptionMessage);

        // Store for manual inspection/reprocessing
        failedEventRepository.save(new FailedEvent(
            record.topic(),
            record.key(),
            record.value().toString(),
            exceptionClass,
            exceptionMessage,
            Instant.now()
        ));

        // Alert the on-call engineer
        alertService.sendAlert("Dead letter: " + exceptionClass + " processing " + record.key());
    }
}

Multiple Event Types on One Topic

@Component
public class OrderEventConsumer {

    @KafkaListener(topics = "order-events", groupId = "notification-service")
    public void handle(ConsumerRecord<String, Object> record, Acknowledgment ack) {
        Object value = record.value();

        if (value instanceof OrderCreatedEvent event) {
            sendOrderConfirmation(event);
        } else if (value instanceof OrderShippedEvent event) {
            sendShippingNotification(event);
        } else if (value instanceof OrderCancelledEvent event) {
            sendCancellationNotification(event);
        }

        ack.acknowledge();
    }
}

Configure the deserializer to map types:

spring:
  kafka:
    consumer:
      properties:
        spring.json.value.default.type: java.lang.Object
        spring.json.use.type.headers: true

Parallel Consumer with Partitioning

@KafkaListener(
    topics = "order-events",
    groupId = "analytics-service",
    concurrency = "6"   // 6 threads, one per partition
)
public void handleOrderEvent(OrderCreatedEvent event, Acknowledgment ack) {
    analyticsService.recordOrderEvent(event);
    ack.acknowledge();
}

Each thread handles one partition. Set concurrency ≤ partition count.

Testing Kafka

Use @EmbeddedKafka for unit/integration tests:

@SpringBootTest
@EmbeddedKafka(
    partitions = 1,
    topics = {"order-events"},
    brokerProperties = {"listeners=PLAINTEXT://localhost:9092"}
)
class OrderEventProducerTest {

    @Autowired KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    @Qualifier("order-events")
    KafkaMessageListenerContainer<String, Object> container;

    private BlockingQueue<ConsumerRecord<String, Object>> records = new LinkedBlockingQueue<>();

    @BeforeEach
    void setup() {
        ContainerTestUtils.waitForAssignment(container, 1);
    }

    @Test
    void publishesOrderCreatedEvent() throws InterruptedException {
        kafkaTemplate.send("order-events", "key-1", new OrderCreatedEvent(
            UUID.randomUUID(), UUID.randomUUID(), "alice@example.com",
            List.of(), BigDecimal.TEN, Instant.now()
        ));

        ConsumerRecord<String, Object> record = records.poll(5, TimeUnit.SECONDS);
        assertThat(record).isNotNull();
        assertThat(record.value()).isInstanceOf(OrderCreatedEvent.class);
    }
}

Or use Testcontainers with a real Kafka:

@SpringBootTest
@Testcontainers
class KafkaIntegrationTest {

    @Container
    @ServiceConnection
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));

    // Full Kafka available — no mocking
}

What You’ve Learned

  • KafkaTemplate.send(topic, key, value) publishes events — key determines which partition
  • @KafkaListener subscribes to a topic; use Acknowledgment.acknowledge() to commit offsets manually
  • DefaultErrorHandler with ExponentialBackOff retries failed messages, then sends to .DLT
  • Dead-letter consumers handle persistently-failing events — store for inspection, alert on-call
  • Idempotent consumers check for duplicate events before processing — required for at-least-once delivery
  • @EmbeddedKafka or KafkaContainer (Testcontainers) enable reliable Kafka integration tests

Next: Article 44 — Reliable Event Publishing: The Transactional Outbox Pattern — guarantee events are published even when the application crashes mid-transaction.