Introduction to Kafka
Kafka is an open-source, distributed messaging system that functions on a publish/subscribe model. It is widely used by numerous large companies for high-performance, real-time data streaming.
Developed by LinkedIn since 2011, Kafka has grown into the most popular distributed streaming platform. It can handle vast amounts of records with high efficiency.
Advantages of Kafka
- Open-source: Freely available and continuously improved by a large community.
- High-throughput, high-frequency: Capable of processing large volumes of data across topics continuously.
- Automatic message storage: Allows for easy message retrieval and verification.
- Large user community: Offers extensive support and shared resources.
Basic Concepts
If you're new to Kafka and Message Queues, here are some key concepts to understand:
- Producer: Creates and sends data to the Kafka server, where data is sent as messages in byte array format.
- Consumer: One or more consumers subscribe to a topic to receive messages when the producer sends data to the Kafka server.
- Consumer Group: A collection of consumers within the same group that share the task of processing messages.
- Topic: Used to receive data sent from the producer, with consumers fetching message data from the topic.
- Broker: Acts as an intermediary that exchanges data between the producer and consumer.
- Cluster: A Kafka cluster consists of multiple servers, each known as a broker.
- ZooKeeper: A server used to manage brokers. Kafka now supports kraft mode, allowing usage without ZooKeeper.
- Partition: When multiple messages are sent to a topic simultaneously, they are distributed across different partitions on various Kafka servers within the same cluster.
Using Kafka with Docker
First, create a `docker-compose.yml` file with the following content:
version: "3"
services:
kafka:
image: bitnami/kafka
ports:
- 9092:9092
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,EXTERNAL://kafka:9094
- KAFKA_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_NODE_ID=1
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
Here’s the configuration file for starting Kafka in kraft mode, so Zookeeper isn't needed.
Next, run the image:
Example of sending a message in Kafka
To access the Kafka container:
docker exec -it kafka bash
Create a new topic with the following command:
kafka-topics.sh --bootstrap-server {kafka host}:{port} --create --topic {topic name}
Use the producer to send messages to the newly created topic:
kafka-console-producer.sh --topic {topic name} --bootstrap-server {kafka host}:{port}
Use the consumer to receive messages from the topic:
kafka-console-consumer.sh --topic {topic name} --from-beginning --bootstrap-server {kafka host}:{port}
Connecting to Kafka with NodeJS
First, install the necessary package to connect to Kafka in NodeJS:
I'll also provide an example to make it easier for you to use:
import {Kafka} from 'kafkajs'
const main = async (): Promise<void> => {
const brokers = ['localhost:9092'],
groupId = 'group-id-value',
topic = 'topic-name'
const getMessage = (length: number) => Array.from({length}).map(() => ({value: 'message value ' + Date.now()}))
const kafka = new Kafka({
brokers,
connectionTimeout: 1000,
authenticationTimeout: 1000,
reauthenticationThreshold: 3000,
})
const producer = kafka.producer()
const consumer = kafka.consumer({groupId})
// Producing
await producer.connect()
await producer.send({
topic,
messages: getMessage(3),
})
// Consuming
await consumer.connect()
await consumer.subscribe({topic, fromBeginning: true})
await consumer.run({
// only execute eachMessage or eachBathch once
eachMessage: async ({topic, partition, message}) => {
console.log({
topic,
partition,
offset: message.offset,
value: message.value.toString(),
})
},
// uncomment to test receive multiple message in the same time
// eachBatch: async ({batch}) => {
// const topic = batch.topic,
// partition = batch.partition,
// messages = batch.messages
// messages.forEach(message => {
// console.log({topic, partition, timestamp: message.timestamp, key: message.key, value: message.value.toString()})
// })
// },
})
}
main()
In the `consumer.run` function, the `eachMessage` field processes one message at a time as it's received, while the `eachBatch` field is used to handle multiple messages sent simultaneously.
Your likes, shares, and comments mean the world to me. Let's spread the word together!
Comments
Post a Comment