Kafka CLI: Creating Topics, Producing, and Consuming Messages

Why Learn the CLI First?

Before writing any Spring code, understanding the Kafka CLI tools gives you the ability to:

  • Verify your cluster is working correctly
  • Inspect topics and partitions
  • Debug consumer lag issues
  • Replay messages from specific offsets
  • Reset consumer groups during incident recovery

All CLI tools are in Kafka’s bin/ directory. In Docker, run them with docker exec:

docker exec kafka <tool> <args>

kafka-topics.sh: Managing Topics

Create a Topic

docker exec kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create \
  --topic orders \
  --partitions 3 \
  --replication-factor 1

# Output: Created topic orders.

Key options:

  • --partitions: number of partitions (determines max parallelism)
  • --replication-factor: how many copies of each partition (must be ≤ number of brokers)
  • --config retention.ms=604800000: set topic-level config at creation

List Topics

docker exec kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --list

# Output:
# orders
# payments
# __consumer_offsets

Describe a Topic

docker exec kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --topic orders
Topic: orders   TopicId: xyz   PartitionCount: 3   ReplicationFactor: 1
  Topic: orders   Partition: 0   Leader: 1   Replicas: 1   Isr: 1
  Topic: orders   Partition: 1   Leader: 1   Replicas: 1   Isr: 1
  Topic: orders   Partition: 2   Leader: 1   Replicas: 1   Isr: 1

The columns:

  • Leader: broker ID that currently leads this partition
  • Replicas: all broker IDs that hold a copy
  • Isr: in-sync replicas (subset of Replicas that are caught up)

Alter Topic Configuration

# Change retention to 3 days
docker exec kafka kafka-configs.sh \
  --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name orders \
  --alter \
  --add-config retention.ms=259200000

# Add partitions (can only increase, never decrease)
docker exec kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --alter \
  --topic orders \
  --partitions 6

Delete a Topic

docker exec kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --delete \
  --topic orders

Deletion is permanent and immediate (assuming delete.topic.enable=true, the default).


kafka-console-producer.sh: Sending Messages

Send Messages Without a Key

docker exec -it kafka kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders

The producer opens an interactive prompt. Type each message and press Enter:

>{"orderId":"1001","customerId":"cust-1","total":99.99}
>{"orderId":"1002","customerId":"cust-2","total":149.50}
>{"orderId":"1003","customerId":"cust-3","total":29.99}
>^C

Messages without keys are distributed round-robin across partitions (or using sticky partitioning within a batch).

Send Messages With Keys

flowchart LR
    Msg1["key=cust-1\nvalue=OrderPlaced"]
    Msg2["key=cust-2\nvalue=OrderPlaced"]
    Msg3["key=cust-1\nvalue=OrderShipped"]

    Msg1 -->|"hash(cust-1) % 3 = 0"| P0["Partition 0"]
    Msg2 -->|"hash(cust-2) % 3 = 1"| P1["Partition 1"]
    Msg3 -->|"hash(cust-1) % 3 = 0"| P0
docker exec -it kafka kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --property parse.key=true \
  --property key.separator=:

Now type messages as key:value:

>cust-1:{"orderId":"1001","customerId":"cust-1","total":99.99}
>cust-2:{"orderId":"1002","customerId":"cust-2","total":149.50}
>cust-1:{"orderId":"1003","customerId":"cust-1","total":29.99}
>^C

Both messages for cust-1 land on the same partition — their relative order is preserved.

Send from a File

# orders.txt
cust-1:{"orderId":"1001","customerId":"cust-1","total":99.99}
cust-2:{"orderId":"1002","customerId":"cust-2","total":149.50}

docker exec kafka kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --property parse.key=true \
  --property key.separator=: \
  < orders.txt

kafka-console-consumer.sh: Reading Messages

Consume New Messages (Live Tail)

docker exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders

Waits for new messages and prints them as they arrive. This is the default — equivalent to tail -f for a Kafka topic.

Consume From the Beginning

docker exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning

Reads every record ever written to the topic (within retention). This is the same as auto.offset.reset=earliest for a new consumer group.

Display Keys and Metadata

docker exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning \
  --property print.key=true \
  --property key.separator=" => " \
  --property print.partition=true \
  --property print.offset=true \
  --property print.timestamp=true

Output:

CreateTime:1714820400000   Partition:0   Offset:0   cust-1 => {"orderId":"1001",...}
CreateTime:1714820401000   Partition:1   Offset:0   cust-2 => {"orderId":"1002",...}
CreateTime:1714820402000   Partition:0   Offset:1   cust-1 => {"orderId":"1003",...}

Notice both messages for cust-1 are on Partition 0, in order.

Consume with a Consumer Group

docker exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning \
  --group inventory-service-cli

This creates a consumer group named inventory-service-cli. Kafka tracks its offset in __consumer_offsets. If you stop and restart this command, it resumes from where it left off (not from the beginning, despite --from-beginning — that flag only applies when the group has no committed offset).

Consume a Specific Number of Messages

docker exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning \
  --max-messages 5

kafka-consumer-groups.sh: Monitoring Consumer Groups

List All Groups

docker exec kafka kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --list

# Output:
# inventory-service
# notification-service
# inventory-service-cli

Describe a Group (Shows Lag)

docker exec kafka kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group inventory-service
GROUP             TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID
inventory-service orders  0          42              42              0     consumer-1-...
inventory-service orders  1          38              40              2     consumer-2-...
inventory-service orders  2          55              55              0     consumer-2-...

The LAG column shows records not yet consumed per partition. Partition 1 has lag of 2 — the consumer is 2 records behind.

flowchart LR
    P0["Partition 0\nLag: 0 ✓"]
    P1["Partition 1\nLag: 2 ⚠"]
    P2["Partition 2\nLag: 0 ✓"]

Describe All Groups at Once

docker exec kafka kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --all-groups

Resetting Consumer Group Offsets

When you need to replay events — after a bug fix, during testing, or for a new consumer — reset the offsets.

flowchart TD
    Decision{Reset strategy}
    Earliest["--to-earliest\nReset to offset 0\n(replay everything)"]
    Latest["--to-latest\nReset to current end\n(skip all historical)"]
    Offset["--to-offset N\nReset to specific offset"]
    DateTime["--to-datetime 2024-01-15T10:00:00.000\nReset to timestamp"]
    Shift["--shift-by -100\nGo back 100 records"]

    Decision --> Earliest
    Decision --> Latest
    Decision --> Offset
    Decision --> DateTime
    Decision --> Shift
# Reset to beginning (replay all messages)
docker exec kafka kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group inventory-service \
  --topic orders \
  --reset-offsets --to-earliest \
  --execute

# Reset to a specific offset on partition 0
docker exec kafka kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group inventory-service \
  --topic orders:0 \
  --reset-offsets --to-offset 100 \
  --execute

# Go back 50 records from current position
docker exec kafka kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group inventory-service \
  --topic orders \
  --reset-offsets --shift-by -50 \
  --execute

# Reset to a specific timestamp
docker exec kafka kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group inventory-service \
  --topic orders \
  --reset-offsets --to-datetime 2024-01-15T00:00:00.000 \
  --execute

Important: the consumer group must be stopped before resetting offsets (no active members).


Producing and Consuming in Order

A key guarantee in Kafka: records with the same key on the same partition are always consumed in the order they were produced.

sequenceDiagram
    participant Producer
    participant P0 as Partition 0\n(key=cust-1)
    participant Consumer

    Producer->>P0: offset 0: OrderPlaced (cust-1)
    Producer->>P0: offset 1: PaymentProcessed (cust-1)
    Producer->>P0: offset 2: OrderShipped (cust-1)

    Consumer->>P0: Fetch from offset 0
    P0-->>Consumer: offset 0: OrderPlaced ✓
    Consumer->>P0: Fetch from offset 1
    P0-->>Consumer: offset 1: PaymentProcessed ✓
    Consumer->>P0: Fetch from offset 2
    P0-->>Consumer: offset 2: OrderShipped ✓

    Note over Consumer: Events for cust-1 always arrive\nin the order they were produced

Test this with the CLI:

# Producer: send ordered events for cust-1
docker exec -it kafka kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --property parse.key=true \
  --property key.separator=:

# Type:
# cust-1:{"event":"OrderPlaced","orderId":"1001"}
# cust-1:{"event":"PaymentProcessed","orderId":"1001"}
# cust-1:{"event":"OrderShipped","orderId":"1001"}

# Consumer: verify order
docker exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning \
  --property print.key=true \
  --property print.partition=true \
  --property print.offset=true

All three cust-1 events appear on the same partition in the same order.


kafka-configs.sh: Viewing and Changing Configuration

# View topic configuration (only non-default values)
docker exec kafka kafka-configs.sh \
  --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name orders \
  --describe

# View all configuration including defaults
docker exec kafka kafka-configs.sh \
  --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name orders \
  --describe \
  --all

# Change message max bytes for a topic
docker exec kafka kafka-configs.sh \
  --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name orders \
  --alter \
  --add-config max.message.bytes=2097152

Topics Needed for This Series

Create all topics used across the tutorial series at once:

for TOPIC in orders payments inventory-updates notifications; do
  docker exec kafka kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --create \
    --topic $TOPIC \
    --partitions 3 \
    --replication-factor 1 \
    --if-not-exists
done

# Verify all created
docker exec kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --list

Key Takeaways

  • kafka-topics.sh creates, describes, lists, alters, and deletes topics
  • kafka-console-producer.sh with --property parse.key=true sends keyed messages — keys route to the same partition every time
  • kafka-console-consumer.sh --from-beginning reads all historical records; without it, only new records are received
  • kafka-consumer-groups.sh --describe shows LAG per partition — the primary indicator of a healthy vs. struggling consumer
  • Offset reset is a powerful recovery tool: --to-earliest, --to-latest, --to-offset, --to-datetime, --shift-by
  • Records with the same key on the same partition are always consumed in production order

Next: Kafka Producer in Spring Boot: KafkaTemplate Basics — replace the CLI producer with a Spring Boot application using KafkaTemplate.