Skip to content

Instantly share code, notes, and snippets.

@kaustavha
Last active February 14, 2024 21:49
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kaustavha/a7e96da03eb48df4d61d to your computer and use it in GitHub Desktop.
Save kaustavha/a7e96da03eb48df4d61d to your computer and use it in GitHub Desktop.
Nodejs kafka consumer benchmark

Kafka-node consumer benchmark

This code can be used to benchmark throughput for a kafka cluster. Args:

groupId -- (str) kafka consumer group id, default: bench
concurrency -- (int) Number of worker threads to spawn, defaults to number of cpus on current host
duration -- (int) How long to run the benchmark for, default: 20s
topic -- (str) the kafka topic to consume from, defaults to observations.json
zk -- (str) zookeeper url, defaults to localhost:2181
useHLC -- (bool) wether to use a simple consumer or a high level consumer, see the kafka docs for an explanation. Defaults to false

E.g:

node master.js --groupId=test --concurrency=2 --duration=60 --topic=test --zk=localhost:2181/zkchroot --useHLC=true

Or for just a single worker

node kafka_consumer_worker.js

Prereqs:

You need a few libs

npm install kafka-node optimist

If you get a error installing kafka-node from a failure to compile its snappy dependency you can replace it with kafka-node--light which removes the dependency on snappy https://github.com/kaustavha/kafka-node--light And replace the require statements from require('kafka-node') to require('kafka-node--light'). Albeit if youre using snappy compression in kafka this wont be able to decompress your messages and you'll see a lower throughput since messages are chunked and compressed together

Results:

All these benchmarks were run a 12 AMD Opteron(tm) Processor 4184 (800mhz) core server with 64 gigs of RAM and 64 kafka partitions.

Processed 407402 messages
Start Time 1452886344.33
Elapsed Time 60.134000062942505
TPS: 6774.902710173456
Total metrics processed: 2076641
Mean metric count per obv: 5.097277382045252
Mode: 2
Processed 411449 messages
Start Time 1452886457.638
Elapsed Time 60.14800000190735
TPS: 6840.6098288713265
Total metrics processed: 2097194
Mean metric count per obv: 5.097093442929744
Mode: 2
Processed 474156 messages
Start Time 1452886560.338
Elapsed Time 60.01300001144409
TPS: 7900.888139396156
Total metrics processed: 2414438
Mean metric count per obv: 5.092075182007609
Mode: 2

Using a simple consumer and variable worker counts:

2 workers : ~16 TPS
4 workers: ~25k TPS
6 workers: ~40k TPS
12 workers: ~70k TPS
24 workers: ~57k TPS

Using the High level consumer and variable worker counts:

6 workers: ~16k TPS
12 workers: ~40k TPS
32 workers: ~45 TPS
64 workers: ~55k TPS
96 workers: ~50k TPS

'use strict';
var kafka = require('kafka-node');
var Offset = kafka.Offset;
var Client = kafka.Client;
var argv = require('optimist').argv;
var cluster = require('cluster');
var os = require('os');
var fs = require('fs');
function createKafkaConsumer(config) {
var Consumer = (config['useHLC'] == true || config['useHLC'] == 'true') ? kafka.HighLevelConsumer : kafka.Consumer;
if (cluster.isWorker) {
var worker = cluster.worker ? 'consumer' + cluster.worker.id + '-' + os.hostname() : 'defaultconsumer-' + os.hostname();
config.worker = worker;
} else {
if (config['useHLC']) console.log('Using a high level consumer');
console.log("Using options: ");
console.log(" topic: " + config.topic + "\n timeout duration: " + config['duration'] + "\n zk url: " + config.zk);
}
var client = new Client(config.zk);
var payloads = [
{
topic: config.topic,
offset: 0
}
],
options = {
autoCommit: false,
groupId: config.groupId,
fromOffset: true // needed to actually start from 0th offset
};
var consumer = new Consumer(client, payloads, options);
var offset = new Offset(client);
var metricsCount = [];
var message_count = 0;
var start_time = Date.now() / 1000;
function stop() {
consumer.pause();
var end_time = Date.now() / 1000;
// Calculate total metrics
// var total = 0;
// var modeMath = {};
// metricsCount.forEach(function(cnt) {
// total += cnt;
// if (modeMath[cnt]) {
// modeMath[cnt] += 1
// } else {
// modeMath[cnt] = 1
// }
// });
// var max = 0, mode;
// Object.keys(modeMath).forEach(function(key) {
// if (modeMath[key] > max) {
// max = modeMath[key];
// mode = key;
// }
// });
var results = {
'TPS': (message_count / (end_time - start_time)),
'Messages Processed': message_count,
'Start Time': start_time,
'Elapsed Time': (end_time - start_time),
// 'Total metrics processed': total,
// 'Mean metric count per obv': (total/message_count),
// 'Mode': + mode
}
// Report to master if we're a worker, else console.log
if (cluster.isWorker) {
process.send(results);
} else {
for (var resKey in results)
console.log(resKey + ' : ' + results[resKey]);
}
process.exit(0);
}
process.on('SIGINT', stop);
setTimeout(stop, config['duration']);
// Handle consumer events
consumer.on('message', function (message) {
if (message_count < 1) {
console.log('Started dequeing messages after ' + ((Date.now() / 1000) - start_time));
}
message_count++;
// var valueObj = JSON.parse(message.value);
// metricsCount.push(Object.keys(valueObj.metrics).length);
});
consumer.on('error', function (err) {
console.log('error', err);
});
/*
* If consumer get `offsetOutOfRange` event, fetch data from the smallest(oldest) offset
*/
consumer.on('offsetOutOfRange', function (topic) {
console.log('\n \n offset out of range exception \n \n');
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
// console.log(min);
consumer.setOffset(topic.topic, topic.partition, min);
});
});
}
require.main == module ? createKafkaConsumer({
'duration': argv.duration || 20 * 1000,
'topic': argv.topic || 'observations.json',
'zk': argv.zk || 'localhost:2181',
'groupId': argv.groupId || 'bench',
'useHLC': argv.ctype == 'HLC' ? true : false
}) : module.exports = createKafkaConsumer;
var cluster = require('cluster');
var os = require('os');
var wrk = 0;
var ingestionWorkers = [];
var total = 0;
var start_time;
var finished_workers = 0;
var fs = require('fs');
//handle some args
var argv = require('optimist').argv;
var config = {
'groupId': argv.groupId || 'bench',
'concurrency': argv.concurrency || os.cpus().length,
'duration': argv.duration || 20 * 1000,
'topic': argv.topic || 'observations.json',
'zk': argv.zk || 'localhost:2181',
'useHLC': argv.useHLC ? true : false
}
function printObj(obj) {
for (var k in obj) {
console.log(k + ' : ' + obj[k]);
}
}
if (cluster.isMaster) {
// Tekk us about our config
console.log("Config: ")
printObj(config);
// create a file to write logs to
// Fork workers.
for (var i = 0; i < config.concurrency; i++) {
// Time how long it takes for all the workers to finish
// This should be 20s or w/e is set in the slave benchmark
if (i == 0) {
start_time = Date.now()
}
var worker = cluster.fork();
ingestionWorkers.push(worker);
// bind message handler to each worker
worker.on('message', function(results) {
total += results['Messages Processed'];
console.log('Results from slave: ');
printObj(results);
});
}
cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
finished_workers++;
if (finished_workers == config.concurrency) {
var duration = ((Date.now() - start_time) / 1000);
console.log('All workers finished: ' + finished_workers);
console.log('Runtime: ' + duration);
console.log('Total messages: ' + total);
console.log('TPS: '+ (total / duration) );
}
});
} else {
require('./kafka_consumer_worker.js')(config);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment