Kafka CLI: Creating Topics, Producing, and Consuming Messages
Why Learn the CLI First?
Before writing any Spring code, understanding the Kafka CLI tools gives you the ability to:
- Verify your cluster is working correctly
- Inspect topics and partitions
- Debug consumer lag issues
- Replay messages from specific offsets
- Reset consumer groups during incident recovery
All CLI tools are in Kafka’s bin/ directory. In Docker, run them with docker exec:
docker exec kafka <tool> <args>
kafka-topics.sh: Managing Topics
Create a Topic
docker exec kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic orders \
--partitions 3 \
--replication-factor 1
# Output: Created topic orders.
Key options:
--partitions: number of partitions (determines max parallelism)--replication-factor: how many copies of each partition (must be ≤ number of brokers)--config retention.ms=604800000: set topic-level config at creation
List Topics
docker exec kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
# Output:
# orders
# payments
# __consumer_offsets
Describe a Topic
docker exec kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic orders
Topic: orders TopicId: xyz PartitionCount: 3 ReplicationFactor: 1
Topic: orders Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: orders Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: orders Partition: 2 Leader: 1 Replicas: 1 Isr: 1
The columns:
- Leader: broker ID that currently leads this partition
- Replicas: all broker IDs that hold a copy
- Isr: in-sync replicas (subset of Replicas that are caught up)
Alter Topic Configuration
# Change retention to 3 days
docker exec kafka kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name orders \
--alter \
--add-config retention.ms=259200000
# Add partitions (can only increase, never decrease)
docker exec kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--alter \
--topic orders \
--partitions 6
Delete a Topic
docker exec kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--delete \
--topic orders
Deletion is permanent and immediate (assuming delete.topic.enable=true, the default).
kafka-console-producer.sh: Sending Messages
Send Messages Without a Key
docker exec -it kafka kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic orders
The producer opens an interactive prompt. Type each message and press Enter:
>{"orderId":"1001","customerId":"cust-1","total":99.99}
>{"orderId":"1002","customerId":"cust-2","total":149.50}
>{"orderId":"1003","customerId":"cust-3","total":29.99}
>^C
Messages without keys are distributed round-robin across partitions (or using sticky partitioning within a batch).
Send Messages With Keys
flowchart LR
Msg1["key=cust-1\nvalue=OrderPlaced"]
Msg2["key=cust-2\nvalue=OrderPlaced"]
Msg3["key=cust-1\nvalue=OrderShipped"]
Msg1 -->|"hash(cust-1) % 3 = 0"| P0["Partition 0"]
Msg2 -->|"hash(cust-2) % 3 = 1"| P1["Partition 1"]
Msg3 -->|"hash(cust-1) % 3 = 0"| P0
docker exec -it kafka kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--property parse.key=true \
--property key.separator=:
Now type messages as key:value:
>cust-1:{"orderId":"1001","customerId":"cust-1","total":99.99}
>cust-2:{"orderId":"1002","customerId":"cust-2","total":149.50}
>cust-1:{"orderId":"1003","customerId":"cust-1","total":29.99}
>^C
Both messages for cust-1 land on the same partition — their relative order is preserved.
Send from a File
# orders.txt
cust-1:{"orderId":"1001","customerId":"cust-1","total":99.99}
cust-2:{"orderId":"1002","customerId":"cust-2","total":149.50}
docker exec kafka kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--property parse.key=true \
--property key.separator=: \
< orders.txt
kafka-console-consumer.sh: Reading Messages
Consume New Messages (Live Tail)
docker exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders
Waits for new messages and prints them as they arrive. This is the default — equivalent to tail -f for a Kafka topic.
Consume From the Beginning
docker exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning
Reads every record ever written to the topic (within retention). This is the same as auto.offset.reset=earliest for a new consumer group.
Display Keys and Metadata
docker exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning \
--property print.key=true \
--property key.separator=" => " \
--property print.partition=true \
--property print.offset=true \
--property print.timestamp=true
Output:
CreateTime:1714820400000 Partition:0 Offset:0 cust-1 => {"orderId":"1001",...}
CreateTime:1714820401000 Partition:1 Offset:0 cust-2 => {"orderId":"1002",...}
CreateTime:1714820402000 Partition:0 Offset:1 cust-1 => {"orderId":"1003",...}
Notice both messages for cust-1 are on Partition 0, in order.
Consume with a Consumer Group
docker exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning \
--group inventory-service-cli
This creates a consumer group named inventory-service-cli. Kafka tracks its offset in __consumer_offsets. If you stop and restart this command, it resumes from where it left off (not from the beginning, despite --from-beginning — that flag only applies when the group has no committed offset).
Consume a Specific Number of Messages
docker exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning \
--max-messages 5
kafka-consumer-groups.sh: Monitoring Consumer Groups
List All Groups
docker exec kafka kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
# Output:
# inventory-service
# notification-service
# inventory-service-cli
Describe a Group (Shows Lag)
docker exec kafka kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group inventory-service
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
inventory-service orders 0 42 42 0 consumer-1-...
inventory-service orders 1 38 40 2 consumer-2-...
inventory-service orders 2 55 55 0 consumer-2-...
The LAG column shows records not yet consumed per partition. Partition 1 has lag of 2 — the consumer is 2 records behind.
flowchart LR
P0["Partition 0\nLag: 0 ✓"]
P1["Partition 1\nLag: 2 ⚠"]
P2["Partition 2\nLag: 0 ✓"]
Describe All Groups at Once
docker exec kafka kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--all-groups
Resetting Consumer Group Offsets
When you need to replay events — after a bug fix, during testing, or for a new consumer — reset the offsets.
flowchart TD
Decision{Reset strategy}
Earliest["--to-earliest\nReset to offset 0\n(replay everything)"]
Latest["--to-latest\nReset to current end\n(skip all historical)"]
Offset["--to-offset N\nReset to specific offset"]
DateTime["--to-datetime 2024-01-15T10:00:00.000\nReset to timestamp"]
Shift["--shift-by -100\nGo back 100 records"]
Decision --> Earliest
Decision --> Latest
Decision --> Offset
Decision --> DateTime
Decision --> Shift
# Reset to beginning (replay all messages)
docker exec kafka kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group inventory-service \
--topic orders \
--reset-offsets --to-earliest \
--execute
# Reset to a specific offset on partition 0
docker exec kafka kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group inventory-service \
--topic orders:0 \
--reset-offsets --to-offset 100 \
--execute
# Go back 50 records from current position
docker exec kafka kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group inventory-service \
--topic orders \
--reset-offsets --shift-by -50 \
--execute
# Reset to a specific timestamp
docker exec kafka kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group inventory-service \
--topic orders \
--reset-offsets --to-datetime 2024-01-15T00:00:00.000 \
--execute
Important: the consumer group must be stopped before resetting offsets (no active members).
Producing and Consuming in Order
A key guarantee in Kafka: records with the same key on the same partition are always consumed in the order they were produced.
sequenceDiagram
participant Producer
participant P0 as Partition 0\n(key=cust-1)
participant Consumer
Producer->>P0: offset 0: OrderPlaced (cust-1)
Producer->>P0: offset 1: PaymentProcessed (cust-1)
Producer->>P0: offset 2: OrderShipped (cust-1)
Consumer->>P0: Fetch from offset 0
P0-->>Consumer: offset 0: OrderPlaced ✓
Consumer->>P0: Fetch from offset 1
P0-->>Consumer: offset 1: PaymentProcessed ✓
Consumer->>P0: Fetch from offset 2
P0-->>Consumer: offset 2: OrderShipped ✓
Note over Consumer: Events for cust-1 always arrive\nin the order they were produced
Test this with the CLI:
# Producer: send ordered events for cust-1
docker exec -it kafka kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--property parse.key=true \
--property key.separator=:
# Type:
# cust-1:{"event":"OrderPlaced","orderId":"1001"}
# cust-1:{"event":"PaymentProcessed","orderId":"1001"}
# cust-1:{"event":"OrderShipped","orderId":"1001"}
# Consumer: verify order
docker exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning \
--property print.key=true \
--property print.partition=true \
--property print.offset=true
All three cust-1 events appear on the same partition in the same order.
kafka-configs.sh: Viewing and Changing Configuration
# View topic configuration (only non-default values)
docker exec kafka kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name orders \
--describe
# View all configuration including defaults
docker exec kafka kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name orders \
--describe \
--all
# Change message max bytes for a topic
docker exec kafka kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name orders \
--alter \
--add-config max.message.bytes=2097152
Topics Needed for This Series
Create all topics used across the tutorial series at once:
for TOPIC in orders payments inventory-updates notifications; do
docker exec kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic $TOPIC \
--partitions 3 \
--replication-factor 1 \
--if-not-exists
done
# Verify all created
docker exec kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
Key Takeaways
kafka-topics.shcreates, describes, lists, alters, and deletes topicskafka-console-producer.shwith--property parse.key=truesends keyed messages — keys route to the same partition every timekafka-console-consumer.sh --from-beginningreads all historical records; without it, only new records are receivedkafka-consumer-groups.sh --describeshowsLAGper partition — the primary indicator of a healthy vs. struggling consumer- Offset reset is a powerful recovery tool:
--to-earliest,--to-latest,--to-offset,--to-datetime,--shift-by - Records with the same key on the same partition are always consumed in production order
Next: Kafka Producer in Spring Boot: KafkaTemplate Basics — replace the CLI producer with a Spring Boot application using KafkaTemplate.