Skip to content

Instantly share code, notes, and snippets.

@thanpolas
Last active January 30, 2017 10:55
Show Gist options
  • Save thanpolas/ed14e3db69646fefe268639ae069bbd5 to your computer and use it in GitHub Desktop.
Save thanpolas/ed14e3db69646fefe268639ae069bbd5 to your computer and use it in GitHub Desktop.
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
var crypto = require('crypto');
var Kafka = require('node-rdkafka');
var BROKER = 'broker-1.service.consul:9092,broker-3.service.consul:9092,broker-2.service.consul:9092';
// var BROKER = 'localhost:9092';
var produceTime = 0;
var consumerGroup = 'node-rdkafka-consumer-flow-example-' + crypto.randomBytes(20).toString('hex');
function getStamp() {
var dt = new Date();
return dt.toISOString();
}
console.log(getStamp(), 'Using consumer group:', consumerGroup);
var consumer = new Kafka.KafkaConsumer({
//'debug': 'all',
'metadata.broker.list': BROKER,
'group.id': consumerGroup,
'enable.auto.commit': true,
'auto.offset.reset': 'earliest',
});
var topicName = 'testNodeKafkaAvro';
//logging debug messages, if debug is enabled
consumer.on('event.log', function(log) {
console.log(getStamp(), log);
});
//logging all errors
consumer.on('error', function(err) {
console.error(getStamp(), 'Error from consumer');
console.error(getStamp(), err);
});
//counter to commit offsets every numMessages are received
var counter = 0;
var numMessages = 5;
consumer.on('ready', function(arg) {
console.log(getStamp(), 'consumer ready.' + JSON.stringify(arg));
//start consuming messages
consumer.consume([topicName]);
});
consumer.on('data', function(m) {
var diff = Date.now() - produceTime;
console.log(getStamp(), 'Produce to Consume difference ms:', diff);
counter++;
//committing offsets every numMessages
if (counter % numMessages === 0) {
console.log(getStamp(), 'calling commit?');
// consumer.commit(m);
}
// Output the actual message contents
console.log(getStamp(), JSON.stringify(m));
console.log(getStamp(), m.value.toString());
});
consumer.on('rebalance', function(e) {
if (e.code === Kafka.CODES.REBALANCE.PARTITION_ASSIGNMENT) {
console.log(getStamp(), 'Partition assignment', e);
} else {
console.log(getStamp(), 'Partition unassignment', e);
}
});
consumer.on('disconnected', function(arg) {
console.log(getStamp(), 'consumer disconnected. ' + JSON.stringify(arg));
});
//starting the consumer
consumer.connect();
//stopping this example after 30s
// setTimeout(function() {
// consumer.disconnect();
// }, 30000);
var producer = new Kafka.Producer({
//'debug' : 'all',
'metadata.broker.list': BROKER,
'dr_cb': true //delivery report callback
});
//logging debug messages, if debug is enabled
producer.on('event.log', function(log) {
console.log(getStamp(), log);
});
//logging all errors
producer.on('error', function(err) {
console.error(getStamp(), 'Error from producer');
console.error(getStamp(), err);
});
//counter to stop this sample after maxMessages are sent
producer.on('delivery-report', function(err, report) {
console.log(getStamp(), 'delivery-report: ' + JSON.stringify(report));
counter++;
});
//Wait for the ready event before producing
producer.on('ready', function(arg) {
console.log(getStamp(), 'producer ready.' + JSON.stringify(arg));
//Create a Topic object with any options our Producer
//should use when producing to that topic.
var topic = producer.Topic(topicName, {
// Make the Kafka broker acknowledge our message (optional)
'request.required.acks': 1
});
var value = new Buffer(JSON.stringify({
name: 'than',
long: 666,
}));
var key = 'key';
// if partition is set to -1, librdkafka will use the default partitioner
var partition = -1;
produceTime = Date.now();
// setTimeout(function() {
producer.produce(topic, partition, value, key);
console.log(getStamp(), 'Message produced');
// }, 2000);
//need to keep polling for a while to ensure the delivery reports are received
setInterval(function() {
producer.poll();
}, 1000);
});
producer.on('disconnected', function(arg) {
console.log(getStamp(), 'producer disconnected. ' + JSON.stringify(arg));
});
//starting the producer
producer.connect();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment