Using Kafka with Docker and NodeJS

Introduction to Kafka

Kafka is an open-source, distributed messaging system that functions on a publish/subscribe model. It is widely used by numerous large companies for high-performance, real-time data streaming.

Developed by LinkedIn since 2011, Kafka has grown into the most popular distributed streaming platform. It can handle vast amounts of records with high efficiency.


Advantages of Kafka

  • Open-source: Freely available and continuously improved by a large community.
  • High-throughput, high-frequency: Capable of processing large volumes of data across topics continuously.
  • Automatic message storage: Allows for easy message retrieval and verification.
  • Large user community: Offers extensive support and shared resources.


Basic Concepts

If you're new to Kafka and Message Queues, here are some key concepts to understand:

  • Producer: Creates and sends data to the Kafka server, where data is sent as messages in byte array format.
  • Consumer: One or more consumers subscribe to a topic to receive messages when the producer sends data to the Kafka server.
  • Consumer Group: A collection of consumers within the same group that share the task of processing messages.
  • Topic: Used to receive data sent from the producer, with consumers fetching message data from the topic.
  • Broker: Acts as an intermediary that exchanges data between the producer and consumer.
  • Cluster: A Kafka cluster consists of multiple servers, each known as a broker.
  • ZooKeeper: A server used to manage brokers. Kafka now supports kraft mode, allowing usage without ZooKeeper.
  • Partition: When multiple messages are sent to a topic simultaneously, they are distributed across different partitions on various Kafka servers within the same cluster.


Using Kafka with Docker

First, create a `docker-compose.yml` file with the following content:

version: "3"
services:
kafka:
image: bitnami/kafka
ports:
- 9092:9092
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,EXTERNAL://kafka:9094
- KAFKA_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_NODE_ID=1
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true

Here’s the configuration file for starting Kafka in kraft mode, so Zookeeper isn't needed.


Next, run the image:

docker compose up -d


Example of sending a message in Kafka

To access the Kafka container:

docker exec -it kafka bash


Create a new topic with the following command:

kafka-topics.sh --bootstrap-server {kafka host}:{port} --create --topic {topic name}



Use the producer to send messages to the newly created topic:

kafka-console-producer.sh --topic {topic name} --bootstrap-server {kafka host}:{port}



Use the consumer to receive messages from the topic:

kafka-console-consumer.sh --topic {topic name} --from-beginning --bootstrap-server {kafka host}:{port}


Connecting to Kafka with NodeJS

First, install the necessary package to connect to Kafka in NodeJS:

yarn add kafkajs


I'll also provide an example to make it easier for you to use:

import {Kafka} from 'kafkajs'

const main = async (): Promise<void> => {
const brokers = ['localhost:9092'],
groupId = 'group-id-value',
topic = 'topic-name'

const getMessage = (length: number) => Array.from({length}).map(() => ({value: 'message value ' + Date.now()}))

const kafka = new Kafka({
brokers,
connectionTimeout: 1000,
authenticationTimeout: 1000,
reauthenticationThreshold: 3000,
})

const producer = kafka.producer()
const consumer = kafka.consumer({groupId})

// Producing
await producer.connect()
await producer.send({
topic,
messages: getMessage(3),
})

// Consuming
await consumer.connect()
await consumer.subscribe({topic, fromBeginning: true})

await consumer.run({
// only execute eachMessage or eachBathch once
eachMessage: async ({topic, partition, message}) => {
console.log({
topic,
partition,
offset: message.offset,
value: message.value.toString(),
})
},

// uncomment to test receive multiple message in the same time
// eachBatch: async ({batch}) => {
// const topic = batch.topic,
// partition = batch.partition,
// messages = batch.messages
// messages.forEach(message => {
// console.log({topic, partition, timestamp: message.timestamp, key: message.key, value: message.value.toString()})
// })
// },
})
}

main()

In the `consumer.run` function, the `eachMessage` field processes one message at a time as it's received, while the `eachBatch` field is used to handle multiple messages sent simultaneously.

Your likes, shares, and comments mean the world to me. Let's spread the word together!

Comments

Popular posts from this blog

Kubernetes Practice Series

NodeJS Practice Series

Docker Practice Series

React Practice Series

Sitemap

Setting up Kubernetes Dashboard with Kind

Deploying a NodeJS Server on Google Kubernetes Engine

DevOps Practice Series

A Handy Guide to Using Dynamic Import in JavaScript