Introduction to Kafka using NodeJs
Building a B2B healthcare product from scratch for the U.S market
This is a small article intended for node.js developers who intend to start implementing distributed messaging system using Kakfa.
I am planning to write a series of articles demonstrating the usage of Kafka and Storm. This article is the first of the same series. So let's begin.
1.1 What is Kafka ?
Kafka is a distributed messaging system providing fast, highly scalable and redundant messaging through a pub-sub model. Kafka’s distributed design gives it several advantages. First, Kafka allows a large number of permanent or ad-hoc consumers. Second, Kafka is highly available and resilient to node failures and supports automatic recovery. In real world data systems, these characteristics make Kafka an ideal fit for communication and integration between components of large scale data systems.
The Kafka Documentation has done an excellent job in explaining the entire architecture.
Before Moving ahead i would suggest the reader to go through the following link. It is very important to understand the architecture.
https://kafka.apache.org/intro
1.2 Installing & Running Zookeeper and Kafka
Kafka can be downloaded from the following link. I am using the current stable release i.e. 0.10.1.1.
https://kafka.apache.org/downloads
Download the tar. Un-tar it and then follow the steps below:
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. Run the following command to start ZooKeeper:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Now to start kafka run the following command:
$ bin/kafka-server-start.sh config/server.properties
1.3 Creating Kafka Topic and playing with it
Let's create one topic and play with it. Below is the command to create a topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Posts
Once you create the topic, you can see the available topics with below command:
$bin/kafka-topics.sh --list --zookeeper localhost:2181
For testing kafka, we can use the kafka-console-producer to send a message
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Posts
We can consume all the messages of the same topic by creating a consumer as below:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Posts --from-beginning
1.3 Integrating Kafka with NodeJS
Let's create a API in NodeJS which will act as a Producer to Kafka. We will be then creating another consumer in NodeJS which will be consuming the topic we created above.
We will be using kafka-node and express module for our producer.
var express = require('express');
var kafka = require('kafka-node');
var app = express();
Let's add the code to handle JSON in our api.
var bodyParser = require('body-parser')
app.use( bodyParser.json() ); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ // to support URL-encoded bodies
extended: true
}));
Now in order to create a kafka producer where you have non-keyed partition, you can simply add the following code
var Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
Now let's add some event handler for our producer. These will help us know the state of the producer.
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})
Now Before going into producing a message to a kafka topic, let us first create a simple route and test our api. Add the below code
app.get('/',function(req,res){
res.json({greeting:'Kafka Consumer'})
});
app.listen(5001,function(){
console.log('Kafka producer running at 5001')
});
So, Now the entire code looks like below:
var express = require('express');
var kafka = require('kafka-node');
var app = express();
var bodyParser = require('body-parser')
app.use( bodyParser.json() ); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ // to support URL-encoded bodies
extended: true
}));
var Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})
app.get('/',function(req,res){
res.json({greeting:'Kafka Producer'})
});
app.listen(5001,function(){
console.log('Kafka producer running at 5001')
})
So let's run the code and test our api in postman.
Now lets create a route which can post some message to the topic.
For the nodejs client, kafka has a producer.send() method which takes two arguments. the first being "payloads" which is an array of ProduceRequest. ProduceRequest is a JSON object like:
{
topic: 'topicName',
messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
key: 'theKey', // only needed when using keyed partitioner (optional)
partition: 0, // default 0 (optional)
attributes: 2 // default: 0 used for compression (optional)
}
Add the below code to get the topic and the message to be sent .
app.post('/sendMsg',function(req,res){
var sentMessage = JSON.stringify(req.body.message);
payloads = [
{ topic: req.body.topic, messages:sentMessage , partition: 0 }
];
producer.send(payloads, function (err, data) {
res.json(data);
});
})
Now let's run the code and hit our api with a payload. Once the producer pushes the message to the topic, we can see the message get consumed in the default shell consumer we created earlier.
Now Let's create a simple consumer for this in nodejs.
In NodeJS, Kafka consumers can be created using multiple ways. The following is the most simple one out of them all:
Consumer(client, payloads, options)
It takes 3 arguments as above. "client" is the one which keeps a connection with the Kafka server. payloads is an array of FetchRequest, FetchRequest is a JSON object like:
{
topic: 'topicName',
offset: 0, //default 0
}
the all possible options for the client are as below:
{
groupId: 'kafka-node-group',//consumer group id, default `kafka-node-group`
// Auto commit config
autoCommit: true,
autoCommitIntervalMs: 5000,
// The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued, default 100ms
fetchMaxWaitMs: 100,
// This is the minimum number of bytes of messages that must be available to give a response, default 1 byte
fetchMinBytes: 1,
// The maximum bytes to include in the message set for this partition. This helps bound the size of the response.
fetchMaxBytes: 1024 * 1024,
// If set true, consumer will fetch message from the given offset in the payloads
fromOffset: false,
// If set to 'buffer', values will be returned as raw buffer objects.
encoding: 'utf8'
}
So let's add the code below to create a simple consumer.
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(client,
[{ topic: 'Posts', offset: 0}],
{
autoCommit: false
}
);
Let us add some simple event handlers. One of which notifies us when a message is consumed. For simplicity of the article, let us just do console.log
consumer.on('message', function (message) {
console.log(message);
});
consumer.on('error', function (err) {
console.log('Error:',err);
})
consumer.on('offsetOutOfRange', function (err) {
console.log('offsetOutOfRange:',err);
})
The entire code of the consumer looks like below:
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(client,
[{ topic: 'Posts', offset: 0}],
{
autoCommit: false
}
);
consumer.on('message', function (message) {
console.log(message);
});
consumer.on('error', function (err) {
console.log('Error:',err);
})
consumer.on('offsetOutOfRange', function (err) {
console.log('offsetOutOfRange:',err);
})
Before testing this consumer, let us first kill the shell consumer. Then hit our producer api
This is the end of this article. But in future articles i am planning to showcase a bit more complicated usage of Kafka.
Hope this article helps!
1.2 Installing & Running Zookeeper and Kafka
Kafka can be downloaded from the following link. I am using the current stable release i.e. 0.10.1.1.
https://kafka.apache.org/downloads
Download the tar. Un-tar it and then follow the steps below:
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. Run the following command to start ZooKeeper:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Now to start kafka run the following command:
$ bin/kafka-server-start.sh config/server.properties
1.3 Creating Kafka Topic and playing with it
Let's create one topic and play with it. Below is the command to create a topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Posts
Once you create the topic, you can see the available topics with below command:
$bin/kafka-topics.sh --list --zookeeper localhost:2181
For testing kafka, we can use the kafka-console-producer to send a message
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Posts
We can consume all the messages of the same topic by creating a consumer as below:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Posts --from-beginning
1.3 Integrating Kafka with NodeJS
Let's create a API in NodeJS which will act as a Producer to Kafka. We will be then creating another consumer in NodeJS which will be consuming the topic we created above.
We will be using kafka-node and express module for our producer.
var express = require('express');
var kafka = require('kafka-node');
var app = express();
Let's add the code to handle JSON in our api.
var bodyParser = require('body-parser')
app.use( bodyParser.json() ); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ // to support URL-encoded bodies
extended: true
}));
Now in order to create a kafka producer where you have non-keyed partition, you can simply add the following code
var Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
Now let's add some event handler for our producer. These will help us know the state of the producer.
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})
Now Before going into producing a message to a kafka topic, let us first create a simple route and test our api. Add the below code
app.get('/',function(req,res){
res.json({greeting:'Kafka Consumer'})
});
app.listen(5001,function(){
console.log('Kafka producer running at 5001')
});
So, Now the entire code looks like below:
var express = require('express');
var kafka = require('kafka-node');
var app = express();
var bodyParser = require('body-parser')
app.use( bodyParser.json() ); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ // to support URL-encoded bodies
extended: true
}));
var Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})
app.get('/',function(req,res){
res.json({greeting:'Kafka Producer'})
});
app.listen(5001,function(){
console.log('Kafka producer running at 5001')
})
So let's run the code and test our api in postman.
Now lets create a route which can post some message to the topic.
For the nodejs client, kafka has a producer.send() method which takes two arguments. the first being "payloads" which is an array of ProduceRequest. ProduceRequest is a JSON object like:
{
topic: 'topicName',
messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
key: 'theKey', // only needed when using keyed partitioner (optional)
partition: 0, // default 0 (optional)
attributes: 2 // default: 0 used for compression (optional)
}
Add the below code to get the topic and the message to be sent .
app.post('/sendMsg',function(req,res){
var sentMessage = JSON.stringify(req.body.message);
payloads = [
{ topic: req.body.topic, messages:sentMessage , partition: 0 }
];
producer.send(payloads, function (err, data) {
res.json(data);
});
})
Now let's run the code and hit our api with a payload. Once the producer pushes the message to the topic, we can see the message get consumed in the default shell consumer we created earlier.
Now Let's create a simple consumer for this in nodejs.
In NodeJS, Kafka consumers can be created using multiple ways. The following is the most simple one out of them all:
Consumer(client, payloads, options)
It takes 3 arguments as above. "client" is the one which keeps a connection with the Kafka server. payloads is an array of FetchRequest, FetchRequest is a JSON object like:
{
topic: 'topicName',
offset: 0, //default 0
}
the all possible options for the client are as below:
{
groupId: 'kafka-node-group',//consumer group id, default `kafka-node-group`
// Auto commit config
autoCommit: true,
autoCommitIntervalMs: 5000,
// The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued, default 100ms
fetchMaxWaitMs: 100,
// This is the minimum number of bytes of messages that must be available to give a response, default 1 byte
fetchMinBytes: 1,
// The maximum bytes to include in the message set for this partition. This helps bound the size of the response.
fetchMaxBytes: 1024 * 1024,
// If set true, consumer will fetch message from the given offset in the payloads
fromOffset: false,
// If set to 'buffer', values will be returned as raw buffer objects.
encoding: 'utf8'
}
So let's add the code below to create a simple consumer.
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(client,
[{ topic: 'Posts', offset: 0}],
{
autoCommit: false
}
);
Let us add some simple event handlers. One of which notifies us when a message is consumed. For simplicity of the article, let us just do console.log
consumer.on('message', function (message) {
console.log(message);
});
consumer.on('error', function (err) {
console.log('Error:',err);
})
consumer.on('offsetOutOfRange', function (err) {
console.log('offsetOutOfRange:',err);
})
The entire code of the consumer looks like below:
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(client,
[{ topic: 'Posts', offset: 0}],
{
autoCommit: false
}
);
consumer.on('message', function (message) {
console.log(message);
});
consumer.on('error', function (err) {
console.log('Error:',err);
})
consumer.on('offsetOutOfRange', function (err) {
console.log('offsetOutOfRange:',err);
})
Before testing this consumer, let us first kill the shell consumer. Then hit our producer api
This is the end of this article. But in future articles i am planning to showcase a bit more complicated usage of Kafka.
Hope this article helps!
No comments:
Post a Comment