Notes on Kafka: Kafka CLI Commands

These section contains useful commands in running Kafka. note that I encountered some errors while playing around with the commands. I've included the errors at the bottom and also how I fixed them. The contents of this section are:
For a quick-read cheat sheet, you may also jump ahead to the bottom of this page, to the Cheat Sheet section.
kafka-topics
We initially installed v2.7.0 in our machine when we discussed Kafka installation on EC2, but the following commands is helpful if you'll be working on Kafka inrastructure that's been built by others. The latest release and the current stable version as of this writing is v2.8.0.
To check the version
[root@hcptstkafka1 kafka]# kafka-topics.sh --version
2.7.0 (Commit:448719dc99a19793)
Note that you must have have both Zookeeper and Kafka running before creating a topic. You may check out the previous write-up for a single-node (single-broker) Kafka cluster, Kafka on EC2. I highly suggest you go through the previous one since this article build upon the knowledge gain from the previous article.
Similarly, you can open three terminals. Start the Zookeeper first in the first terminal, and then Kafka on the second terminal using the command below. Then all other operations you can do on the third terminal.
# On terminal 1
zookeeper-server-start.sh /$KAFKA-HOME/zookeeper.properties

# On terminal 2
kafka-server-start.sh /$KAFKA-HOME/server.properties
To create a topic, we need to specify:
  • kafka-topics.sh
  • --create --topic, followed by the topic name
  • --partition, followed by how many partition you want,
  • --replication-factor, followed by a number which should be equal or less than the number of brokers.
  • --zookeeper, followed by local machine's and port 2181
  • as of v2.2, we use --bootstrap-server which runs on port 9092 instead.
  • In the example below, we're creating two topics using the deprecated way and the new approach. Note that localhost and 127.0.0.1 can be used interchangeably.
    [root@hcptstkafka1 ~]# kafka-topics.sh \
    --topic test-topic-1 --create \
    --partitions 3 --replication-factor 1 \
    --zookeeper 127.0.0.1:2181
    
    Created topic test-topic-1.
    
    [root@hcptstkafka1 ~]# kafka-topics.sh \
    --create --topic test-topic-2 \
    --partitions 4 --replication-factor 1 \
    --bootstrap-server 127.0.0.1:9092
    
    Created topic test-topic-2.
    
    [root@hcptstkafka1 ~]# kafka-topics.sh \
    --create --topic test-topic-3 \
    --partitions 5 --replication-factor 1 \
    --zookeeper localhost:2181
    
    Created topic test-topic-3.
    To list the topics,
    [root@hcptstkafka1 ~]# kafka-topics.sh --list \
    --bootstrap-server localhost:9092
    
    test-topic-1
    test-topic-2
    test-topic-3
    
    [root@hcptstkafka1 ~]# kafka-topics.sh --list \
    --zookeeper localhost:2181
    
    test-topic-1
    test-topic-2
    test-topic-3
    To delete a topic, run the command/s below. Note that when you run the --delete with the new approach, no output is returned.
    On the other hand, using the deprecated method with the --delete will return a message saying that the topic is is marked for deletion.
    Either methods will work and the topics deleted will not appear anymore when you try to list the topics again. Let's try to delete test-topic-2 and test-topic-3
    [root@hcptstkafka1 ~]# kafka-topics.sh \
    --delete --topic test-topic-2 \
    --zookeeper localhost:2181
    
    Topic test-topic-2 is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.
    [root@hcptstkafka1 ~]# 
    [root@hcptstkafka1 ~]# kafka-topics.sh \
    --delete --topic test-topic-3 \
    --bootstrap-server localhost:9092
    [root@hcptstkafka1 ~]# 
    [root@hcptstkafka1 ~]# kafka-topics.sh --list \
    --bootstrap-server localhost:9092
    
    test-topic-1
    To get more details about a certain topic,
    [root@hcptstkafka1 ~]# kafka-topics.sh \
    --describe --topic test-topic-1 \
    --bootstrap-server localhost:9092
    
    Topic: test-topic-1     PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
            Topic: test-topic-1     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
            Topic: test-topic-1     Partition: 1    Leader: 0       Replicas: 0     Isr: 0
            Topic: test-topic-1     Partition: 2    Leader: 0       Replicas: 0     Isr: 0
    Let's create two more example topics.
    [root@hcptstkafka1 kafka]# kafka-topics.sh \
    --create --topic test-eden-1 \
    --partitions 3 --replication-factor 1 \
    --bootstrap-server localhost:9092
    
    Created topic test-eden-1.
    
    [root@hcptstkafka1 kafka]# kafka-topics.sh \
    --create --topic test-tina-1 \
    --partitions 3 --replication-factor 1 \
    --bootstrap-server localhost:9092
    
    Created topic test-tina-1.
    Going back to the our $KAFKA-HOME folder, which is where are Kafka package was previously installed, we can check the new logs created in the data/kafka folder.
    Here we can see logs were created for the first topic, test-topic-1 and for the two new topics. test-eden-1 and test-tina-1. Note that each partition will also have a log.
    Since we set three partitions for each topic, and there are three topics in total, we should see 9 new logs.
    [root@hcptstkafka1 ~]# cd /usr/local/bin/kafka/data/kafka/
    [root@hcptstkafka1 kafka]# ll
    total 20
    -rw-r--r-- 1 root root   4 Aug 15 07:25 cleaner-offset-checkpoint       
    -rw-r--r-- 1 root root   4 Aug 15 07:38 log-start-offset-checkpoint     
    -rw-r--r-- 1 root root  88 Aug 15 06:13 meta.properties
    -rw-r--r-- 1 root root 151 Aug 15 07:38 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root 151 Aug 15 07:39 replication-offset-checkpoint   
    drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-0
    drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-1
    drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-2
    drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-0
    drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-1
    drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-2
    drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-0
    drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-1
    drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-2
    [root@hcptstkafka1 kafka]# 
    [root@hcptstkafka1 kafka]# kafka-topics.sh --list --bootstrap-server localhost:9092
    test-eden-1
    test-tina-1
    test-topic-1
    kafka-console-producer
    To run this command, the following is required
  • specify brokers using broker-list - deprecated
  • as replacement to brokers-list, specify bootstrap-server
  • specify topic using topic
  • When you run the command with the required parameters, it should return a caret (>). You can now send a message to the topic. To exit, hit Ctrl-c.
    [root@hcptstkafka1 ~] kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic test-topic-1
    >Hello world!
    >This is the first message that we sent to a topic!                               D                   D           
    >^Z
    
    [root@hcptstkafka1 ~] kafka-console-producer.sh \
    --bootstrap-server localhost:9092
    >Awesome job on this!
    >Alright!
    >^Z
    What if I sent a message to a non-existing topic
    When you produce a message and specified a topic that is still not create, it will return an error saying "LEADER_NOT_AVAILABLE". You'll still be allowed to type in a message until you exit out.
    What happens here is the producer tries to send a message to a non-existing topic but was greeted with a warning. However, the producer is smart enough to recover and wait for the Kafka topic to be created. Once the topic is created, you can continue sending messages to the now-created topic.
    [root@hcptstkafka1 ~] kafka-console-producer.sh \
    --broker-list localhost:9092 --topic test-topic-10
    > Third set of messages!
    [2021-06-29 19:57:39,480] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-10=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
    >Wait there's error but I'm still able to type in a message
    >^Z
    
    # the new topic should now appear when you try to list the topics
    [root@hcptstkafka1 ~] kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --list
    
    test-topic-1
    test-topic-10
    The topic is created with the default values. You can see when you try to run describe.
    [root@hcptstkafka1 ~] kafka-topics.sh \
    --describe --topic test-topic-10 \
    --bootstrap-server localhost:9092
    
    Topic: test-topic-10    PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
            Topic: test-topic-10    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
    The topic will default to 1 partition and a replication factor of 1. This isn't best practice thus it is recommended to ensure that the destination topics are created.
    You can also change this default value by editing the config/server.properties
    [root@hcptstkafka1 ~] pwd
    /usr/local/bin/kafka
    [root@hcptstkafka1 kafka]
    [root@hcptstkafka1 kafka] vi config/server.properties 
    
    ## Some of the output omitted ##
    
    ############################# Log Basics #############################
    
    # A comma separated list of directories under which to store log files
    log.dirs=/usr/local/bin/kafka/data/kafka
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=4
    Whenever we edit the properties file, we'll need to stop the Kafka service and then run it again for the changes to take effect. On the terminal where kafka-server-start.sh was run, hit Ctrl-C then run the command again.
    Now if we try to send a message again to a non-existing, the topic will be created with the new default value.
    # Send the message
    [root@hcptstkafka1 ~] kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic test-topic-100
    >Another message sent!
    [2021-06-29 20:12:57,142] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-100=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
    >Oops! But hey, still sending message!
    >^Z
    
    # List the topics and get the details of the newly-created topic.
    [root@hcptstkafka1 ~] kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --list
    test-topic-1
    test-topic-10
    test-topic-100
    [root@hcptstkafka1 ~] kafka-topics.sh \
    --describe --topic test-topic-100 \
    --bootstrap-server localhost:9092 
    
    Topic: test-topic-100   PartitionCount: 4       ReplicationFactor: 1    Configs: segment.bytes=1073741824
            Topic: test-topic-100   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
            Topic: test-topic-100   Partition: 1    Leader: 0       Replicas: 0     Isr: 0
            Topic: test-topic-100   Partition: 2    Leader: 0       Replicas: 0     Isr: 0
            Topic: test-topic-100   Partition: 3    Leader: 0       Replicas: 0     Isr: 0
    Recall that producers can choose to accept acknowledgements from the topic that it has received the message. This is done through the acks.
    We'll send a message again but this time we specify that we want confirmation from the topic.
    [root@hcptstkafka1 ~] kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic-1 --producer-property acks=all
    > This is my second batch of message sent to the topic.
    >I should get confirmation from the topic
    kafka-console-consumer
    Here we'll verify the messages sent to the topics. Similar with the producer, you also need to specify some parameters:
  • bootstrap-server
  • topic
  • However, it will only read the real-time messages that was sent AFTER kafka-console-consumer is ran.
    If we try to run kafka-console-consumer and then run kafka-console-producer on another window and send a message again to the topic, the console consumer will now be able to see it in real-time.
    # In producer's window
    [root@hcptstkafka1 kafka]# kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --topic test-topic-10
    >Sending a message to the second topic - test-topic-10!
    >yeahboii!
    >over and out!
    
    # In consumer's window.
    # Note that each line of message arrives in real-time.
    [root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
    --bootstrap-server localhost:90192 \
    --topic test-topic-10                                                         sr: 0
    Sending a message to the second topic - test-topic-10!                           sr: 0
    yeahboii!
    over and out!
    To read the total messages sent to the topic, we can append --from-beginning. To see the messages we sent to test-topic-1 earlier,
    [root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic test-topic-1 \
    --from-beginning
    
    Alright!
    I should get confirmation from the topic
    This is the first message that we sent to a topic!
    This is my second batch of message sent to the topic.
    Note that when you get the messages, it is not retrieved in order. Order is not guaranteed if you have multiple partitions. This is because when the producer send messages to the topic, the messages are distributed across the partitions.
    You can have a guaranteed ordering of messages retrieved if you have a single-partition topic.
    Reading Messages from specific partitions and offsets
    We can also specify the partition that we want to read by adding the --partition parameter followed the partition number.
    Let's first produce some messages to the test-eden-1 topic
    [root@hcptstkafka1 ~]# kafka-console-producer.sh \
    --topic test-eden-1 \
    --bootstrap-server localhost:9092
    
    >This topic will contain a list of names
    >Barney
    >Marshall
    >Ted
    >Robin
    >Lily
    >Sheldon
    >Rajesh
    >Penny
    >Leonard
    >Howard
    >Bernadette
    >Amy
    Now let's open a second terminal and read the messages, starting from the beginning. Again, the messages will not appear in the same order that they were sent.
    [root@hcptstkafka1 ~]# kafka-console-consumer.sh \
    --topic test-eden-1 \
    --from-beginning \
    --bootstrap-server localhost:9092
    
    This topic will contain a list of names
    Marshall
    Lily
    Rajesh
    Leonard
    Amy
    
    Barney
    Ted
    Sheldon
    Penny
    Bernadette
    Robin
    Howard
    This topic was created with 3 partitions. If we want to view only the messages that went to partition 1, we can do the following:
    [root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
    --partition 1 \
    --from-beginning \
    --bootstrap-server localhost:9092
    
    This topic will contain a list of names
    Marshall
    Lily
    Rajesh
    Leonard
    Amy
    Similarly, we can read the messages in the same partition but starting from a particular offset. Note that if you want to specify an offset, remove the --from-beginning parameter.
    [root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
    --partition 1 \
    --offset 2\
    --bootstrap-server localhost:9092
    
    Lily
    Rajesh
    Leonard
    Amy
    QUESTION: Can I read messages starting from an offset and do this on different partitions?
    If we try query all partitions and display all the messages beginning from a specific offset, we will get an error.
    [root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
    --offset 2\
    --bootstrap-server localhost:9092
    
    The partition is required when offset is specified.
    Running Multiple Consumers
    Let's now try to run a producer and two consumers. Let's create test-topic-10 for this example.
    # Create new topic
    [root@hcptstkafka1 kafka]# kafka-topics.sh \
    --create \
    --topic test-topic-10 \
    --partitions 3 \
    --replication-factor 1 \
    --bootstrap-server localhost:9092
    Created topic test-tina-1.
    On the first and second terminal, run the consumer 1 and consumer 2:
    # Runs consumer 1 on terminal 1
    [root@hcptstkafka1 ~]# kafka-console-consumer.sh \
    --topic test-topic-10 \
    --bootstrap-server localhost:9092
    # Runs consumer 2 on terminal 2
    [root@hcptstkafka1 ~]# kafka-console-consumer.sh \
    --topic test-topic-10 \
    --bootstrap-server localhost:9092
    On the third terminal, run the producer. Then start sending messages.
    [root@hcptstkafka1 kafka]# kafka-console-producer.sh \
    --topic test-topic-10 \
    --bootstrap-server localhost:9092
    >10
    >20
    >30
    >40
    >50
    >60
    Go back to the first two terminals. You'll see the messages coming in.
    Consumer Group
    If we have multiple producers sending messages to a single consumer, the rate of incoming messages may overwhelm the consumer and the consumer may not be able to keep up and fall behind. To resolve this, we can split the data between two consumers that are both subscribed to the same topic.
    We've done this previously, but this time we'll utilize Consumer groups. But what exactly is the difference between using multiple consumers vs. using consumer groups?
    Using multiple consumers
    As we've seen previously, two consumers subscribed to the same topic we'll receive the same messages that's being sent by the producers. It's not really doing any parallel consumption since each consumer is treated as an entirely different application.
    Using consumer groups
    As we will see, the messages that arrives at the consumer groups are load-balanced between the consumers. Consumers registered to the same consumer group will have the same group id.
    Ideally, the number of consumers within a consumer group should be equal to the number of partitions that a topic has.
    Photo taken from the Kafka: The Definitive Guide book
    If there are more consumers than the existing partitions, the excess consumers will go idle and will just waste resources.
    Photo taken from the Kafka: The Definitive Guide book
    How does Kafka distribute the partitions among the consumers?
    At the beginning, a consumer group subscribes to the topic then Kafka "maps" the topic to the group-id of the consumer group. Kafka then checks if any of the existing consumers have the mapped group-id.
    If there are two consumers with the same group-id, meaning both are part of the same consumer group, then the topic's partitions are balanced between the two. Note that these consumers cannot have the same assigned partitions.
    It is also possible to have multiple consumer groups subscribed to the same topic. This is the case if a second consumer group is performing tasks that are different with the first consumer groups.
    To register consumers to a consumer group, we can use the parameter --group when we run consumers.
    As mentioned, if we have multiple consumers in a consumer group retrieving messages from a topic, the retrieved messages will be load-balanced between the two consumers. This means some messages will go to consumer 1 while some will go to consumer 2.
    To prove this, open three terminals: one for the producer and two for the consumers. First, ran the kafka-console-consumerwith --group on both consumers.
    # Run  command in two terminals for the consumers
    [root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic test-topic-1 \
    --group my-awesome-app-group
    
    # Run command in terminal for producer
    [root@hcptstkafka1 kafka]# kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --topic test-topic-1
    Then on the producer terminal, run kafka-console-producer and begin sending messages. You'll see that messages will be distributed between the two consumers.
    Now, recall that when a consumer retrieves a message from the topic, it commits the offset. When you use --from-beginning with the kafka-console-consumer and specified the --group, you'll be able to see all of the messages sent to the topic.
    If you try running the same command again with the --from-beginning, it will return nothing. This is because the first time this command was run, all the messages from the start have been committed.
    [root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic test-topic-1  \
    --group my-awesome-app-group \
    --from-beginning
    
    let try sending messages! which consumer will receive this color?
    consumer will receive this color?
    red
    orange
    yellow
    green 
    blue  
    
    # When you rerun the command again, it returns nothing.
    [root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic test-topic-1  \
    --group my-awesome-app-group \
    --from-beginning
    kafka-consumer-groups
    These commands can be used to list, create, and basically do a lot of stuff with consumer groups. To list the consumer groups,
    [root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 --list
    
    my-awesome-app-group
    To get the details of the consumer groups, append with --describe and specify the group.
    [root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group my-awesome-app-group \
    --describe 
    
    Consumer group 'my-awesome-app-group' has no active members.
    
    GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    my-awesome-app-group test-topic-1    2          9               11              2               -               -               -
    my-awesome-app-group test-topic-1    1          10              10              0               -               -               -
    my-awesome-app-group test-topic-1    0          19              20              1               -               -               -
    Notice the column for lag. This specifies how many messages in the specific partition that are still not retrieved and processed by the consumer group. To make sure the consumer group is caught up with all the messages, run the kafka-console-consumer again.
    [root@hcptstkafka1 kafka] kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic test-topic-1
    
    violet
    white 
    black
    Now when you try to ran the describe on the consumer group, the lag will now be zero.
    [root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group my-awesome-app-group \
    --describe 
    
    Consumer group 'my-awesome-app-group' has no active members.
    
    GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    my-awesome-app-group test-topic-1    2          11              11              0               -               -               -
    my-awesome-app-group test-topic-1    1          10              10              0               -               -               -
    my-awesome-app-group test-topic-1    0          20              20              0               -               -               -
    Resetting offsets
    As previously mentioned, running the kafka-console-consumer.sh and specifying the group will read from the offset - which is the last position in the partition where it left off.
    There's actually an option to reset the offset and specify where we want in the partition/s to start reading data, which is in a way replaying the data.
    When we ran just kafka-consumer-group command without any parameter, it will not push through and instead will return the available parameters that you can append to the command.
    [root@hcptstkafka1 kafka]# kafka-consumer-groups.sh 
    
    # Some parts of the output omitted
    --reset-offsets                         Reset offsets of consumer group.
                                              Supports one consumer group at the
                                              time, and instances should be
                                              inactive
                                            Has 2 execution options: --dry-run
                                              (the default) to plan which offsets
                                              to reset, and --execute to update
                                              the offsets. Additionally, the --
                                              export option is used to export the
                                              results to a CSV format.
                                            You must choose one of the following
                                              reset specifications: --to-datetime,
                                              --by-period, --to-earliest, --to-
                                              latest, --shift-by, --from-file, --
                                              to-current.
                                            To define the scope use --all-topics
                                              or --topic. One scope must be
                                              specified unless you use '--from-
                                              file'.
    Let's try to reset the offset to the beginning by using --reset-offsets and specify that we want to start at the beginning--to-earliest. We also need to define if we want reset the offset for just one topic or for all topics.
    Lastly, we just want a preview/dry-run and not to actually reset the offset so we'll also append --execute
    [root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group my-awesome-app-group \
    --reset-offsets --to-earliest \
    --topic test-topic-1 \
    --execute
    
    GROUP                          TOPIC                          PARTITION  NEW-OFFSET
    my-awesome-app-group           test-topic-1                   0          0
    my-awesome-app-group           test-topic-1                   1          0
    my-awesome-app-group           test-topic-1                   2          0
    Now if we try to retrieve the messages from test-topic-1, we'll be able to see all the messages from the beginning.
    [root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic test-topic-1 \
    --group my-awesome-app-group
    
    Alright!
    I should get confirmation from the topic
    the first
    ahh right,
    orange
    orange
    green
    
    This is the first message thatwe sent to a topic!
     This is my second batch of message sent to the topic.
    Let's see where this will go
    now lets try colors
    yellow
    red
    violet
    
    Hello world!
    Awesome job on this!
    which consumer
    or second
    its distributes
    red\
    green
    blue
    let try sending messages! which consumer will receive this color?
    yellow
    blue
    white
    We can also move the offset forward or backward by using --shift-by and append a positive number to advance forward with the number of steps or a negative number to move backward.
    # Moving the offset three steps forward
    [root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group my-awesome-app-group \
    --reset-offsets \
    --shift-by 3 \
    --topic test-topic-1 \
    --execute
    
    GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
    my-awesome-app-group           test-topic-1                   0          3
    my-awesome-app-group           test-topic-1                   1          3
    my-awesome-app-group           test-topic-1                   2          3
    
    # Moving the offset another step.
    [root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group my-awesome-app-group \
    --reset-offsets \
    --shift-by 1 \
    --topic test-topic-1 \
    --execute
    
    GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
    my-awesome-app-group           test-topic-1                   0          4
    my-awesome-app-group           test-topic-1                   1          4
    my-awesome-app-group           test-topic-1                   2          4
    
    # Moving the offset three steps backward.
    [root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group my-awesome-app-group \
    --reset-offsets \
    --shift-by -3 \
    --topic test-topic-1 \
    --execute
    
    GROUP                          TOPIC                          PARTITION  NEW-OFFSET
    my-awesome-app-group           test-topic-1                   0          1
    my-awesome-app-group           test-topic-1                   1          1
    my-awesome-app-group           test-topic-1                   2          1
    Troubleshooting
    Kafka Socket server failed - Bind Exception
    This means a port conflict issue.
    ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    org.apache.kafka.common.KafkaException: Socket server failed to bind to 0.0.0.0:9092: Address already in use
    To resolve, you can either run KafkaServer id=0 on a different port (like 9093) or find the processing listening to the port and kill it (easiest).
    To find the PID, you can either use netstat or lsof.
    [root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092
    tcp6       0      0 :::9092                 :::*                    LISTEN      23999/java
    [root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
    java    23999 root  134u  IPv6  52852      0t0  TCP *:XmlIpcRegSvc (LISTEN)
    
    # send a kill signal
    [root@hcptstkafka1 kafka]# kill -9 23999
    
    # process should now disappear
    [root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
    [root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092
    
    # retry running kafka
    [root@hcptstkafka1 kafka]# kafka-server-start.sh config/server.properties
    Other CLI options
    We've discussed that messages will be randomly distributed to the partitions when the producer sends them. A way to ensure that messages will always go to a specific partition through the use of message keys. This can also be utilized by consumers to retrieve messages from a specific partition.
    Producer with keys
    kafka-console-producer  \
    --broker-list 127.0.0.1:9092  \
    --topic first_topic  \
    --property parse.key=true  \
    --property key.separator=,
    > key,value
    > another key,another value
    Consumer with keys
    kafka-console-consumer \
    --bootstrap-server 127.0.0.1:9092 \
    --topic first_topic --from-beginning  \
    --property print.key=true  \
    --property key.separator=,
    Cheat Sheet
    This simple and easy cheat sheet of commands is from Bogdan Stashchuk's Github page for his Apache Kafka Course.
    Basic KAFKA Commands
    
    START ZOOKEEPER
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
    START KAFKA BROKER
    bin/kafka-server-start.sh config/server.properties
    
    CREATE TOPIC
    bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --create \
    --replication-factor 1 \
    --partitions 3 \
    --topic test
    
    LIST TOPICS
    bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --list
    
    TOPIC DETAILS
    bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 \
    --describe \
    --topic test
    
    START CONSOLE PRODUCER
    bin/kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic test
    
    START CONSOLE CONSUMER
    bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic test
    
    START CONSOLE CONSUMER AND READ MESSAGES FROM BEGINNING
    bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic test \
    --from-beginning
    
    START CONSOLE CONSUMER WITH SPECIFIC CONSUMER GROUP
    bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic test \
    --group test \
    --from-beginning
    
    LIST CONSUMER GROUPS
    bin/kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --list
    
    CONSUMER GROUP DETAILS
    bin/kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group test \
    --describe
    Resources
    I know this write-up is lengthy and I'll probably keep coming back and read this again, but if you're interested to learn more, all my notes are based on the following courses.
    Besides this, you can also try out another open-source alternative of Kafka CLI, KafkaCat. This one I might try once I'm done with the course.
    If you've enjoyed my notes or if they somehow brought some value, I'll be glad to connect with you on Twitter!. 😃

    34

    This website collects cookies to deliver better user experience

    Notes on Kafka: Kafka CLI Commands