19
Notes on Kafka: Kafka CLI Commands
These section contains useful commands in running Kafka. note that I encountered some errors while playing around with the commands. I've included the errors at the bottom and also how I fixed them. The contents of this section are:
For a quick-read cheat sheet, you may also jump ahead to the bottom of this page, to the Cheat Sheet section.
We initially installed v2.7.0
in our machine when we discussed Kafka installation on EC2, but the following commands is helpful if you'll be working on Kafka inrastructure that's been built by others. The latest release and the current stable version as of this writing is v2.8.0
.
To check the version
[root@hcptstkafka1 kafka]# kafka-topics.sh --version
2.7.0 (Commit:448719dc99a19793)
Note that you must have have both Zookeeper and Kafka running before creating a topic. You may check out the previous write-up for a single-node (single-broker) Kafka cluster, Kafka on EC2. I highly suggest you go through the previous one since this article build upon the knowledge gain from the previous article.
Similarly, you can open three terminals. Start the Zookeeper first in the first terminal, and then Kafka on the second terminal using the command below. Then all other operations you can do on the third terminal.
# On terminal 1
zookeeper-server-start.sh /$KAFKA-HOME/zookeeper.properties
# On terminal 2
kafka-server-start.sh /$KAFKA-HOME/server.properties
To create a topic, we need to specify:
kafka-topics.sh
-
--create --topic
, followed by the topic name -
--partition
, followed by how many partition you want, -
--replication-factor
, followed by a number which should be equal or less than the number of brokers. -
--zookeeper
, followed by local machine's and port 2181 - as of v2.2, we use
--bootstrap-server
which runs on port9092
instead.
In the example below, we're creating two topics using the deprecated way and the new approach. Note that localhost
and 127.0.0.1
can be used interchangeably.
[root@hcptstkafka1 ~]# kafka-topics.sh \
--topic test-topic-1 --create \
--partitions 3 --replication-factor 1 \
--zookeeper 127.0.0.1:2181
Created topic test-topic-1.
[root@hcptstkafka1 ~]# kafka-topics.sh \
--create --topic test-topic-2 \
--partitions 4 --replication-factor 1 \
--bootstrap-server 127.0.0.1:9092
Created topic test-topic-2.
[root@hcptstkafka1 ~]# kafka-topics.sh \
--create --topic test-topic-3 \
--partitions 5 --replication-factor 1 \
--zookeeper localhost:2181
Created topic test-topic-3.
To list the topics,
[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--bootstrap-server localhost:9092
test-topic-1
test-topic-2
test-topic-3
[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--zookeeper localhost:2181
test-topic-1
test-topic-2
test-topic-3
To delete a topic, run the command/s below. Note that when you run the --delete
with the new approach, no output is returned.
On the other hand, using the deprecated method with the --delete
will return a message saying that the topic is is marked for deletion.
Either methods will work and the topics deleted will not appear anymore when you try to list the topics again. Let's try to delete test-topic-2 and test-topic-3
[root@hcptstkafka1 ~]# kafka-topics.sh \
--delete --topic test-topic-2 \
--zookeeper localhost:2181
Topic test-topic-2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@hcptstkafka1 ~]#
[root@hcptstkafka1 ~]# kafka-topics.sh \
--delete --topic test-topic-3 \
--bootstrap-server localhost:9092
[root@hcptstkafka1 ~]#
[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--bootstrap-server localhost:9092
test-topic-1
To get more details about a certain topic,
[root@hcptstkafka1 ~]# kafka-topics.sh \
--describe --topic test-topic-1 \
--bootstrap-server localhost:9092
Topic: test-topic-1 PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test-topic-1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Let's create two more example topics.
[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create --topic test-eden-1 \
--partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
Created topic test-eden-1.
[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create --topic test-tina-1 \
--partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
Created topic test-tina-1.
Going back to the our $KAFKA-HOME folder, which is where are Kafka package was previously installed, we can check the new logs created in the data/kafka folder.
Here we can see logs were created for the first topic, test-topic-1 and for the two new topics. test-eden-1 and test-tina-1. Note that each partition will also have a log.
Since we set three partitions for each topic, and there are three topics in total, we should see 9 new logs.
[root@hcptstkafka1 ~]# cd /usr/local/bin/kafka/data/kafka/
[root@hcptstkafka1 kafka]# ll
total 20
-rw-r--r-- 1 root root 4 Aug 15 07:25 cleaner-offset-checkpoint
-rw-r--r-- 1 root root 4 Aug 15 07:38 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 Aug 15 06:13 meta.properties
-rw-r--r-- 1 root root 151 Aug 15 07:38 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 151 Aug 15 07:39 replication-offset-checkpoint
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-0
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-1
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-2
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-0
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-1
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-2
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-0
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-1
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-2
[root@hcptstkafka1 kafka]#
[root@hcptstkafka1 kafka]# kafka-topics.sh --list --bootstrap-server localhost:9092
test-eden-1
test-tina-1
test-topic-1
To run this command, the following is required
- specify brokers using
broker-list
- deprecated - as replacement to brokers-list, specify
bootstrap-server
- specify topic using
topic
When you run the command with the required parameters, it should return a caret (>). You can now send a message to the topic. To exit, hit Ctrl-c.
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test-topic-1
>Hello world!
>This is the first message that we sent to a topic! D D
>^Z
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--bootstrap-server localhost:9092
>Awesome job on this!
>Alright!
>^Z
When you produce a message and specified a topic that is still not create, it will return an error saying "LEADER_NOT_AVAILABLE". You'll still be allowed to type in a message until you exit out.
What happens here is the producer tries to send a message to a non-existing topic but was greeted with a warning. However, the producer is smart enough to recover and wait for the Kafka topic to be created. Once the topic is created, you can continue sending messages to the now-created topic.
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 --topic test-topic-10
> Third set of messages!
[2021-06-29 19:57:39,480] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-10=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>Wait there's error but I'm still able to type in a message
>^Z
# the new topic should now appear when you try to list the topics
[root@hcptstkafka1 ~] kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
test-topic-1
test-topic-10
The topic is created with the default values. You can see when you try to run describe
.
[root@hcptstkafka1 ~] kafka-topics.sh \
--describe --topic test-topic-10 \
--bootstrap-server localhost:9092
Topic: test-topic-10 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test-topic-10 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
The topic will default to 1 partition and a replication factor of 1. This isn't best practice thus it is recommended to ensure that the destination topics are created.
You can also change this default value by editing the config/server.properties
[root@hcptstkafka1 ~] pwd
/usr/local/bin/kafka
[root@hcptstkafka1 kafka]
[root@hcptstkafka1 kafka] vi config/server.properties
## Some of the output omitted ##
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/usr/local/bin/kafka/data/kafka
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=4
Whenever we edit the properties file, we'll need to stop the Kafka service and then run it again for the changes to take effect. On the terminal where kafka-server-start.sh
was run, hit Ctrl-C then run the command again.
Now if we try to send a message again to a non-existing, the topic will be created with the new default value.
# Send the message
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test-topic-100
>Another message sent!
[2021-06-29 20:12:57,142] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-100=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>Oops! But hey, still sending message!
>^Z
# List the topics and get the details of the newly-created topic.
[root@hcptstkafka1 ~] kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
test-topic-1
test-topic-10
test-topic-100
[root@hcptstkafka1 ~] kafka-topics.sh \
--describe --topic test-topic-100 \
--bootstrap-server localhost:9092
Topic: test-topic-100 PartitionCount: 4 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test-topic-100 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-100 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-100 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-100 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Recall that producers can choose to accept acknowledgements from the topic that it has received the message. This is done through the acks.
We'll send a message again but this time we specify that we want confirmation from the topic.
[root@hcptstkafka1 ~] kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic-1 --producer-property acks=all
> This is my second batch of message sent to the topic.
>I should get confirmation from the topic
Here we'll verify the messages sent to the topics. Similar with the producer, you also need to specify some parameters:
bootstrap-server
topic
However, it will only read the real-time messages that was sent AFTER kafka-console-consumer
is ran.
If we try to run kafka-console-consumer
and then run kafka-console-producer
on another window and send a message again to the topic, the console consumer will now be able to see it in real-time.
# In producer's window
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-10
>Sending a message to the second topic - test-topic-10!
>yeahboii!
>over and out!
# In consumer's window.
# Note that each line of message arrives in real-time.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:90192 \
--topic test-topic-10 sr: 0
Sending a message to the second topic - test-topic-10! sr: 0
yeahboii!
over and out!
To read the total messages sent to the topic, we can append --from-beginning
. To see the messages we sent to test-topic-1 earlier,
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--from-beginning
Alright!
I should get confirmation from the topic
This is the first message that we sent to a topic!
This is my second batch of message sent to the topic.
Note that when you get the messages, it is not retrieved in order. Order is not guaranteed if you have multiple partitions. This is because when the producer send messages to the topic, the messages are distributed across the partitions.
You can have a guaranteed ordering of messages retrieved if you have a single-partition topic.
We can also specify the partition that we want to read by adding the --partition
parameter followed the partition number.
Let's first produce some messages to the test-eden-1 topic
[root@hcptstkafka1 ~]# kafka-console-producer.sh \
--topic test-eden-1 \
--bootstrap-server localhost:9092
>This topic will contain a list of names
>Barney
>Marshall
>Ted
>Robin
>Lily
>Sheldon
>Rajesh
>Penny
>Leonard
>Howard
>Bernadette
>Amy
Now let's open a second terminal and read the messages, starting from the beginning. Again, the messages will not appear in the same order that they were sent.
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-eden-1 \
--from-beginning \
--bootstrap-server localhost:9092
This topic will contain a list of names
Marshall
Lily
Rajesh
Leonard
Amy
Barney
Ted
Sheldon
Penny
Bernadette
Robin
Howard
This topic was created with 3 partitions. If we want to view only the messages that went to partition 1, we can do the following:
[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--partition 1 \
--from-beginning \
--bootstrap-server localhost:9092
This topic will contain a list of names
Marshall
Lily
Rajesh
Leonard
Amy
Similarly, we can read the messages in the same partition but starting from a particular offset. Note that if you want to specify an offset, remove the --from-beginning
parameter.
[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--partition 1 \
--offset 2\
--bootstrap-server localhost:9092
Lily
Rajesh
Leonard
Amy
QUESTION: Can I read messages starting from an offset and do this on different partitions?
If we try query all partitions and display all the messages beginning from a specific offset, we will get an error.
[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--offset 2\
--bootstrap-server localhost:9092
The partition is required when offset is specified.
Let's now try to run a producer and two consumers. Let's create test-topic-10 for this example.
# Create new topic
[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create \
--topic test-topic-10 \
--partitions 3 \
--replication-factor 1 \
--bootstrap-server localhost:9092
Created topic test-tina-1.
On the first and second terminal, run the consumer 1 and consumer 2:
# Runs consumer 1 on terminal 1
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
# Runs consumer 2 on terminal 2
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
On the third terminal, run the producer. Then start sending messages.
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
>10
>20
>30
>40
>50
>60
Go back to the first two terminals. You'll see the messages coming in.
If we have multiple producers sending messages to a single consumer, the rate of incoming messages may overwhelm the consumer and the consumer may not be able to keep up and fall behind. To resolve this, we can split the data between two consumers that are both subscribed to the same topic.
We've done this previously, but this time we'll utilize Consumer groups. But what exactly is the difference between using multiple consumers vs. using consumer groups?
Using multiple consumers
As we've seen previously, two consumers subscribed to the same topic we'll receive the same messages that's being sent by the producers. It's not really doing any parallel consumption since each consumer is treated as an entirely different application.
Using consumer groups
As we will see, the messages that arrives at the consumer groups are load-balanced between the consumers. Consumers registered to the same consumer group will have the same group id.
Ideally, the number of consumers within a consumer group should be equal to the number of partitions that a topic has.
If there are more consumers than the existing partitions, the excess consumers will go idle and will just waste resources.
How does Kafka distribute the partitions among the consumers?
At the beginning, a consumer group subscribes to the topic then Kafka "maps" the topic to the group-id of the consumer group. Kafka then checks if any of the existing consumers have the mapped group-id.
If there are two consumers with the same group-id, meaning both are part of the same consumer group, then the topic's partitions are balanced between the two. Note that these consumers cannot have the same assigned partitions.
It is also possible to have multiple consumer groups subscribed to the same topic. This is the case if a second consumer group is performing tasks that are different with the first consumer groups.
To register consumers to a consumer group, we can use the parameter --group
when we run consumers.
As mentioned, if we have multiple consumers in a consumer group retrieving messages from a topic, the retrieved messages will be load-balanced between the two consumers. This means some messages will go to consumer 1 while some will go to consumer 2.
To prove this, open three terminals: one for the producer and two for the consumers. First, ran the kafka-console-consumer
with --group
on both consumers.
# Run command in two terminals for the consumers
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group
# Run command in terminal for producer
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1
Then on the producer terminal, run kafka-console-producer
and begin sending messages. You'll see that messages will be distributed between the two consumers.
Now, recall that when a consumer retrieves a message from the topic, it commits the offset. When you use --from-beginning
with the kafka-console-consumer
and specified the --group
, you'll be able to see all of the messages sent to the topic.
If you try running the same command again with the --from-beginning
, it will return nothing. This is because the first time this command was run, all the messages from the start have been committed.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group \
--from-beginning
let try sending messages! which consumer will receive this color?
consumer will receive this color?
red
orange
yellow
green
blue
# When you rerun the command again, it returns nothing.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group \
--from-beginning
These commands can be used to list, create, and basically do a lot of stuff with consumer groups. To list the consumer groups,
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 --list
my-awesome-app-group
To get the details of the consumer groups, append with --describe
and specify the group.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--describe
Consumer group 'my-awesome-app-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-awesome-app-group test-topic-1 2 9 11 2 - - -
my-awesome-app-group test-topic-1 1 10 10 0 - - -
my-awesome-app-group test-topic-1 0 19 20 1 - - -
Notice the column for lag. This specifies how many messages in the specific partition that are still not retrieved and processed by the consumer group. To make sure the consumer group is caught up with all the messages, run the kafka-console-consumer
again.
[root@hcptstkafka1 kafka] kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1
violet
white
black
Now when you try to ran the describe
on the consumer group, the lag will now be zero.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--describe
Consumer group 'my-awesome-app-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-awesome-app-group test-topic-1 2 11 11 0 - - -
my-awesome-app-group test-topic-1 1 10 10 0 - - -
my-awesome-app-group test-topic-1 0 20 20 0 - - -
As previously mentioned, running the kafka-console-consumer.sh
and specifying the group will read from the offset - which is the last position in the partition where it left off.
There's actually an option to reset the offset and specify where we want in the partition/s to start reading data, which is in a way replaying the data.
When we ran just kafka-consumer-group
command without any parameter, it will not push through and instead will return the available parameters that you can append to the command.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh
# Some parts of the output omitted
--reset-offsets Reset offsets of consumer group.
Supports one consumer group at the
time, and instances should be
inactive
Has 2 execution options: --dry-run
(the default) to plan which offsets
to reset, and --execute to update
the offsets. Additionally, the --
export option is used to export the
results to a CSV format.
You must choose one of the following
reset specifications: --to-datetime,
--by-period, --to-earliest, --to-
latest, --shift-by, --from-file, --
to-current.
To define the scope use --all-topics
or --topic. One scope must be
specified unless you use '--from-
file'.
Let's try to reset the offset to the beginning by using --reset-offsets
and specify that we want to start at the beginning--to-earliest
. We also need to define if we want reset the offset for just one topic or for all topics.
Lastly, we just want a preview/dry-run and not to actually reset the offset so we'll also append --execute
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets --to-earliest \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 0
my-awesome-app-group test-topic-1 1 0
my-awesome-app-group test-topic-1 2 0
Now if we try to retrieve the messages from test-topic-1
, we'll be able to see all the messages from the beginning.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group
Alright!
I should get confirmation from the topic
the first
ahh right,
orange
orange
green
This is the first message thatwe sent to a topic!
This is my second batch of message sent to the topic.
Let's see where this will go
now lets try colors
yellow
red
violet
Hello world!
Awesome job on this!
which consumer
or second
its distributes
red\
green
blue
let try sending messages! which consumer will receive this color?
yellow
blue
white
We can also move the offset forward or backward by using --shift-by
and append a positive number to advance forward with the number of steps or a negative number to move backward.
# Moving the offset three steps forward
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by 3 \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 3
my-awesome-app-group test-topic-1 1 3
my-awesome-app-group test-topic-1 2 3
# Moving the offset another step.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by 1 \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 4
my-awesome-app-group test-topic-1 1 4
my-awesome-app-group test-topic-1 2 4
# Moving the offset three steps backward.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by -3 \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 1
my-awesome-app-group test-topic-1 1 1
my-awesome-app-group test-topic-1 2 1
Kafka Socket server failed - Bind Exception
This means a port conflict issue.
ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: Socket server failed to bind to 0.0.0.0:9092: Address already in use
To resolve, you can either run KafkaServer id=0 on a different port (like 9093) or find the processing listening to the port and kill it (easiest).
To find the PID, you can either use netstat
or lsof
.
[root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092
tcp6 0 0 :::9092 :::* LISTEN 23999/java
[root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
java 23999 root 134u IPv6 52852 0t0 TCP *:XmlIpcRegSvc (LISTEN)
# send a kill signal
[root@hcptstkafka1 kafka]# kill -9 23999
# process should now disappear
[root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
[root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092
# retry running kafka
[root@hcptstkafka1 kafka]# kafka-server-start.sh config/server.properties
We've discussed that messages will be randomly distributed to the partitions when the producer sends them. A way to ensure that messages will always go to a specific partition through the use of message keys. This can also be utilized by consumers to retrieve messages from a specific partition.
Producer with keys
kafka-console-producer \
--broker-list 127.0.0.1:9092 \
--topic first_topic \
--property parse.key=true \
--property key.separator=,
> key,value
> another key,another value
Consumer with keys
kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 \
--topic first_topic --from-beginning \
--property print.key=true \
--property key.separator=,
This simple and easy cheat sheet of commands is from Bogdan Stashchuk's Github page for his Apache Kafka Course.
Basic KAFKA Commands
START ZOOKEEPER
bin/zookeeper-server-start.sh config/zookeeper.properties
START KAFKA BROKER
bin/kafka-server-start.sh config/server.properties
CREATE TOPIC
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--replication-factor 1 \
--partitions 3 \
--topic test
LIST TOPICS
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
TOPIC DETAILS
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic test
START CONSOLE PRODUCER
bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test
START CONSOLE CONSUMER
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test
START CONSOLE CONSUMER AND READ MESSAGES FROM BEGINNING
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--from-beginning
START CONSOLE CONSUMER WITH SPECIFIC CONSUMER GROUP
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--group test \
--from-beginning
LIST CONSUMER GROUPS
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
CONSUMER GROUP DETAILS
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group test \
--describe
I know this write-up is lengthy and I'll probably keep coming back and read this again, but if you're interested to learn more, all my notes are based on the following courses.
Apache Kafka Series - Learn Apache Kafka for Beginners v2 by Stephane Maarek
Getting Started with Apache Kafka by Ryan Plant
Apache Kafka A-Z with Hands on Learning by Learnkart Technology Private Limited
The Complete Apache Kafka Practical Guide by Bogdan Stashchuk
Besides this, you can also try out another open-source alternative of Kafka CLI, KafkaCat. This one I might try once I'm done with the course.
If you've enjoyed my notes or if they somehow brought some value, I'll be glad to connect with you on Twitter!. 😃
19