Publish/Subscribe pattern example (Redis, Kafka)

Hi, in tutorial I'm goin to show you how to implement this pattern. I won't explain details about Kafka or Redis. Then main idea here is to show you how to create simple implementation with these technologies.

To this tutorial we going to use Docker and docker-compose is necessary if you want to follow this tutorial.

Explain

The publish/subscribe pattern is very easy to understand. The main idea is to has central point where all the applications will put the information and then it will be consumed by other applications. You can see the imagen below.

Depends on the technologies we're using there're some terms that will change. For example in Redis to classify our information they call it a channel. In Kafka they call it topic.

Topics or channels is the way to classify information. For example, we will save information about the login process. We can create a (topic|chennel) called login_info and another to save then transaction information of a payment called transaction_payment.

Before start we need to install some dependencies.

npm install express ioredis kafkajs

docker-compose (Redis)

We need to create a docker-compose.yml and put this content.

version: "3.9"
services:
  redis-service:
    image: "redis:alpine"
    container_name: redis-service
    ports:
      - "6379:6379"

Publisher (Redis)

publisher.js. You can create an endpoint to send the information at moment. For me this approach works.

const express = require('express')
const Redis = require('ioredis')

const app = express()
const port = 3000
const redis = new Redis()

app.listen(port, async () => {
  const message = { message: 'message from redis' }
  redis.publish('test', JSON.stringify(message))
  console.log(`Example app (publisher) at http://localhost:${port}`)
})

Subscriber (Redis)

subscriber.js

const express = require('express')
const Redis = require('ioredis')

const app = express()
const port = 3500

const redis = new Redis()

app.listen(port, async () => {
  await redis.subscribe('test')

  redis.on('message', (channel, message) => {
    console.log(`Received ${message} from ${channel} (Redis)`)
  })

  console.log(`Example app (subscribe) at http://localhost:${port}`)
})

Now need to execute the following commands.

$ docker-compose up -d

$ node subscriber.js

$ node publisher.js // open in another terminal

You should now see a message in your terminal (subscriber). That was very easy =). Now we can do the same process to implement Kafka.

docker-compose (Kafka)

To implement Kafka we need to download a docker-compose.yml file from confluent page https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html.

We can use the entire docker-compose file with all services. But it is not necessary for this tutorial. The fundamental services to start working with Kafka are zookeeper and broker.

version: "3.9"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:6.2.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

Publisher (Kafka)

const express = require('express')
const { Kafka } = require('kafkajs')

const app = express()
const port = 3000


const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

app.listen(port, () => {
  const producer = kafka.producer()
  await producer.connect()
  await producer.send({
    topic: 'test',
    messages: [{ value: 'Hello KafkaJS user!' }],
  })

  await producer.disconnect()

  console.log(`Example app (publisher) at http://localhost:${port}`)
})

Subscriber (Kafka)

Before I begin, I need to clarify that this code could be anywhere you want.

const admin = kafka.admin()

  try {
    await admin.connect()

    await admin.createTopics({
      waitForLeaders: true,
      topics: [{topic: 'test' }],
    })
  } catch(err) {
    console.error(err)
  } finally {
    await admin.disconnect()
  }

Kafka needs setup before starting. For example we need to create the topics before. Kafka has more settings unlike Redis. I put this content inside subscriber.js if you run this code multiple times this throws an error because the topic already exists.

const express = require('express')
const { Kafka } = require('kafkajs')

const app = express()
const port = 3500

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

app.listen(port, async () => {
  const admin = kafka.admin()

  try {
    await admin.connect()

    await admin.createTopics({
      waitForLeaders: true,
      topics: [{topic: 'test' }],
    })
  } catch(err) {
    console.error(err)
  } finally {
    await admin.disconnect()
  }

  // kafka
  const consumer = kafka.consumer({ groupId: 'test-group' })
  await consumer.connect()
  await consumer.subscribe({ topic: 'test', fromBeginning: true })
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log(`Received ${message.value.toString()} from channel ${topic} (Kafka)`)
    },
  })

  console.log(`Example app (subscribe) at http://localhost:${port}`)
})

That's it to be able to work with Kafka. Now we can run the same commands we saw before.

$ docker-compose up -d

$ node subscriber.js

$ node publisher.js // open in another terminal

I hope it was useful for you and that you can start working with these technologies. If you have any corrections, let me know.

16