16
Publish/Subscribe pattern example (Redis, Kafka)
Hi, in tutorial I'm goin to show you how to implement this pattern. I won't explain details about Kafka or Redis. Then main idea here is to show you how to create simple implementation with these technologies.
To this tutorial we going to use Docker
and docker-compose
is necessary if you want to follow this tutorial.
The publish/subscribe pattern is very easy to understand. The main idea is to has central point where all the applications will put the information and then it will be consumed by other applications. You can see the imagen below.
Depends on the technologies we're using there're some terms that will change. For example in Redis to classify our information they call it a channel. In Kafka they call it topic.
Topics or channels is the way to classify information. For example, we will save information about the login process. We can create a (topic|chennel) called login_info
and another to save then transaction information of a payment called transaction_payment
.
Before start we need to install some dependencies.
npm install express ioredis kafkajs
We need to create a docker-compose.yml
and put this content.
version: "3.9"
services:
redis-service:
image: "redis:alpine"
container_name: redis-service
ports:
- "6379:6379"
publisher.js
. You can create an endpoint to send the information at moment. For me this approach works.
const express = require('express')
const Redis = require('ioredis')
const app = express()
const port = 3000
const redis = new Redis()
app.listen(port, async () => {
const message = { message: 'message from redis' }
redis.publish('test', JSON.stringify(message))
console.log(`Example app (publisher) at http://localhost:${port}`)
})
subscriber.js
const express = require('express')
const Redis = require('ioredis')
const app = express()
const port = 3500
const redis = new Redis()
app.listen(port, async () => {
await redis.subscribe('test')
redis.on('message', (channel, message) => {
console.log(`Received ${message} from ${channel} (Redis)`)
})
console.log(`Example app (subscribe) at http://localhost:${port}`)
})
Now need to execute the following commands.
$ docker-compose up -d
$ node subscriber.js
$ node publisher.js // open in another terminal
You should now see a message in your terminal (subscriber). That was very easy =). Now we can do the same process to implement Kafka.
To implement Kafka we need to download a docker-compose.yml
file from confluent page https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html.
We can use the entire docker-compose file with all services. But it is not necessary for this tutorial. The fundamental services to start working with Kafka are zookeeper
and broker
.
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:6.2.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
const express = require('express')
const { Kafka } = require('kafkajs')
const app = express()
const port = 3000
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
app.listen(port, () => {
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'test',
messages: [{ value: 'Hello KafkaJS user!' }],
})
await producer.disconnect()
console.log(`Example app (publisher) at http://localhost:${port}`)
})
Before I begin, I need to clarify that this code could be anywhere you want.
const admin = kafka.admin()
try {
await admin.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [{topic: 'test' }],
})
} catch(err) {
console.error(err)
} finally {
await admin.disconnect()
}
Kafka needs setup before starting. For example we need to create the topics before. Kafka has more settings unlike Redis. I put this content inside subscriber.js
if you run this code multiple times this throws an error because the topic already exists.
const express = require('express')
const { Kafka } = require('kafkajs')
const app = express()
const port = 3500
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
app.listen(port, async () => {
const admin = kafka.admin()
try {
await admin.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [{topic: 'test' }],
})
} catch(err) {
console.error(err)
} finally {
await admin.disconnect()
}
// kafka
const consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'test', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`Received ${message.value.toString()} from channel ${topic} (Kafka)`)
},
})
console.log(`Example app (subscribe) at http://localhost:${port}`)
})
That's it to be able to work with Kafka. Now we can run the same commands we saw before.
$ docker-compose up -d
$ node subscriber.js
$ node publisher.js // open in another terminal
I hope it was useful for you and that you can start working with these technologies. If you have any corrections, let me know.
16