-
-
Save thanpolas/ed14e3db69646fefe268639ae069bbd5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library | |
* | |
* Copyright (c) 2016 Blizzard Entertainment | |
* | |
* This software may be modified and distributed under the terms | |
* of the MIT license. See the LICENSE.txt file for details. | |
*/ | |
var crypto = require('crypto'); | |
var Kafka = require('node-rdkafka'); | |
var BROKER = 'broker-1.service.consul:9092,broker-3.service.consul:9092,broker-2.service.consul:9092'; | |
// var BROKER = 'localhost:9092'; | |
var produceTime = 0; | |
var consumerGroup = 'node-rdkafka-consumer-flow-example-' + crypto.randomBytes(20).toString('hex'); | |
function getStamp() { | |
var dt = new Date(); | |
return dt.toISOString(); | |
} | |
console.log(getStamp(), 'Using consumer group:', consumerGroup); | |
var consumer = new Kafka.KafkaConsumer({ | |
//'debug': 'all', | |
'metadata.broker.list': BROKER, | |
'group.id': consumerGroup, | |
'enable.auto.commit': true, | |
'auto.offset.reset': 'earliest', | |
}); | |
var topicName = 'testNodeKafkaAvro'; | |
//logging debug messages, if debug is enabled | |
consumer.on('event.log', function(log) { | |
console.log(getStamp(), log); | |
}); | |
//logging all errors | |
consumer.on('error', function(err) { | |
console.error(getStamp(), 'Error from consumer'); | |
console.error(getStamp(), err); | |
}); | |
//counter to commit offsets every numMessages are received | |
var counter = 0; | |
var numMessages = 5; | |
consumer.on('ready', function(arg) { | |
console.log(getStamp(), 'consumer ready.' + JSON.stringify(arg)); | |
//start consuming messages | |
consumer.consume([topicName]); | |
}); | |
consumer.on('data', function(m) { | |
var diff = Date.now() - produceTime; | |
console.log(getStamp(), 'Produce to Consume difference ms:', diff); | |
counter++; | |
//committing offsets every numMessages | |
if (counter % numMessages === 0) { | |
console.log(getStamp(), 'calling commit?'); | |
// consumer.commit(m); | |
} | |
// Output the actual message contents | |
console.log(getStamp(), JSON.stringify(m)); | |
console.log(getStamp(), m.value.toString()); | |
}); | |
consumer.on('rebalance', function(e) { | |
if (e.code === Kafka.CODES.REBALANCE.PARTITION_ASSIGNMENT) { | |
console.log(getStamp(), 'Partition assignment', e); | |
} else { | |
console.log(getStamp(), 'Partition unassignment', e); | |
} | |
}); | |
consumer.on('disconnected', function(arg) { | |
console.log(getStamp(), 'consumer disconnected. ' + JSON.stringify(arg)); | |
}); | |
//starting the consumer | |
consumer.connect(); | |
//stopping this example after 30s | |
// setTimeout(function() { | |
// consumer.disconnect(); | |
// }, 30000); | |
var producer = new Kafka.Producer({ | |
//'debug' : 'all', | |
'metadata.broker.list': BROKER, | |
'dr_cb': true //delivery report callback | |
}); | |
//logging debug messages, if debug is enabled | |
producer.on('event.log', function(log) { | |
console.log(getStamp(), log); | |
}); | |
//logging all errors | |
producer.on('error', function(err) { | |
console.error(getStamp(), 'Error from producer'); | |
console.error(getStamp(), err); | |
}); | |
//counter to stop this sample after maxMessages are sent | |
producer.on('delivery-report', function(err, report) { | |
console.log(getStamp(), 'delivery-report: ' + JSON.stringify(report)); | |
counter++; | |
}); | |
//Wait for the ready event before producing | |
producer.on('ready', function(arg) { | |
console.log(getStamp(), 'producer ready.' + JSON.stringify(arg)); | |
//Create a Topic object with any options our Producer | |
//should use when producing to that topic. | |
var topic = producer.Topic(topicName, { | |
// Make the Kafka broker acknowledge our message (optional) | |
'request.required.acks': 1 | |
}); | |
var value = new Buffer(JSON.stringify({ | |
name: 'than', | |
long: 666, | |
})); | |
var key = 'key'; | |
// if partition is set to -1, librdkafka will use the default partitioner | |
var partition = -1; | |
produceTime = Date.now(); | |
// setTimeout(function() { | |
producer.produce(topic, partition, value, key); | |
console.log(getStamp(), 'Message produced'); | |
// }, 2000); | |
//need to keep polling for a while to ensure the delivery reports are received | |
setInterval(function() { | |
producer.poll(); | |
}, 1000); | |
}); | |
producer.on('disconnected', function(arg) { | |
console.log(getStamp(), 'producer disconnected. ' + JSON.stringify(arg)); | |
}); | |
//starting the producer | |
producer.connect(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment