Dipping toes into Kafka and Node

Some upcoming work is on the horizon which I need to work with something I've not had the pleasure before - Kafka.

I thought it best to try out Kafka on my own terms using technologies I'm comfortable with before going into the real world and breaking something in production.

Two thoughts knocked around:

  1. I wasn't concerned with configuring Kafka itself
  2. I wanted to use Node to communicate with Kafka

As I wasn't too concerned with the configuration of Kafka itself I decided to use this handy blog post from baeldung.com where I was walked through the steps needed to get an instance of Kafka running locally via docker.

Task one was done ✅

With that out the way it was time to start communicating with the Kafka instance. Kafka is a framework for streaming data - messages are posted to Kafka grouped under topics. Different applications can subscribe to topics, these applications are called consumers. When a message comes into Kafka - it will forward these messages to the consumer for processing at their pleasure.

This is what I wanted to try out today.

First lets get a consumer up and running, I spun up a cheeky Node app using our friendly command npm init -y and when this was done I installed our only dependency, KafkaJS using npm install kafkajs.

Now the project was setup I created a new file index.js and added the familiar script "start": "node index.js",

Let's see the code for the consumer:

const { Kafka } = require('kafkajs')

async function run() {
    const kafka = new Kafka({
        clientId: 'marvel-consumer',
        brokers: ['localhost:29092']
    })

    const consumer = kafka.consumer({ groupId: 'marvel-consumers' })

    await consumer.connect()

    await consumer.subscribe({ topic: 'marvel' })

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log('marvel-consumer', {
                topic,
                partition,
                key: message.key.toString(),
                value: message.value.toString(),
                headers: message.headers,
            })
        },
    })
};

run();

The intention here was to encapsulate a consumer into its own simple application, this application listens for messages on the topic marvel and writes the published message to the console.

In order to achieve this first I needed to connect to my local running Kafka instance, I also needed to give my consumer a group id - this is used by Kafka to group consumers together. When consumers are grouped not every consumer will be posted each message as Kafka will use round robin to forward messages on. This last point led to some head scratching for a few moments when I decided to use multiple consumers.

Grouping is there for us to setup multiple instances of the same consumers for resilience.

Following this I then give the topic the consumer is interested in - for this consumer it was the topic 'marvel'.

I then wanted to try out multiple consumers in order to see how they function, to do this I simply copy pasted the code already defined for this first consumer. I made sure to update the group ids to ensure each consumer gets all the messages for the topics they're interested in.

In one of these extra consumers I subscribed just to the topic 'dc' and in the other I subscribed to both 'marvel' and 'dc' topics using the below code:

const { Kafka } = require('kafkajs')

async function run() {
    const kafka = new Kafka({
        clientId: 'superhero-consumer',
        brokers: ['localhost:29092']
    })

    const consumer = kafka.consumer({ groupId: 'superhero-consumers' })

    await consumer.connect()

    await consumer.subscribe({ topic: 'marvel' })

    await consumer.subscribe({ topic: 'dc' })

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log('superhero-consumer', {
                topic,
                partition,
                key: message.key.toString(),
                value: message.value.toString(),
                headers: message.headers,
            })
        },
    })
};

run();

After running npm start for each of these defined consumers and I was ready to process messages, so I needed to create a producer (or two).

To do this I made a new directory for my producers /producers and ran through the same steps to create a new node project as we did for the consumers. But this time the index.js saw some different code:

const { Kafka } = require('kafkajs')

async function run() {
    const kafka = new Kafka({
        clientId: 'dc-producer',
        brokers: ['localhost:29092']
    })

    const topic = 'dc'

    const producer = kafka.producer()

    await producer.connect()

    await producer.send({
        topic,
        messages: [
            { key: 'batman', value: 'bruce wayne' },
        ],
    })

    process.exit(0);
}

run();

Here I run through the same setup as before, I connect to Kafka and then define a producer instead of a consumer. I then use the producer to send messages to Kafka for the given topic (in this instance that's 'dc').

Running this code above with our trusty npm start command I then saw messages pop into the superheroes and the 'dc' consumers at the same time.

I officially produced and consumed messages via Kafka! Task two was done ✅

I then duplicated my producer coder to see some marvel messages and was happy.

Hope you find something useful here - I was concerned before starting off that this might lead down some complicated paths but it certainly seems like more intelligent people got here first and made it a lot simpler to get started with Kafka and Node.

References:

To achieve this I used the above mentioned blog post for running docker and the rest of my code was pieced together using the KafkaJS docs.

19