Skip to content

Instantly share code, notes, and snippets.

@vaibhav93
Forked from shubhi1407/consumer.js
Created June 8, 2018 20:41
Show Gist options
  • Save vaibhav93/59f29fb289559c20b97cffd240e3333a to your computer and use it in GitHub Desktop.
Save vaibhav93/59f29fb289559c20b97cffd240e3333a to your computer and use it in GitHub Desktop.
var Kafka = require('node-rdkafka');
var producer = require('./producer');
var consumer = new Kafka.KafkaConsumer({
//'debug': 'all',
'metadata.broker.list': 'gc-mskafka1002.pp-devqa-ms-thirdparty.us-central1.gcp.dev.paypalinc.com:9092',
'group.id': 'node-rdkafka-consumer-flow-example',
'enable.auto.commit': false,
'socket.keepalive.enable': true,
'reconnect.backoff.jitter.ms': 500
});
var topicName = 'test_partitions';
//logging debug messages, if debug is enabled
consumer.on('event.log', function (log) {
console.log(log);
});
//logging all errors
consumer.on('event.error', function (err) {
console.error('Error from consumer');
console.error(err);
});
//counter to commit offsets every numMessages are received
var counter = 0;
var numMessages = 1;
consumer.on('ready', function (arg) {
consumer.isReady = true;
consumer.subscribe([topicName]);
console.log('consumer ready.' + JSON.stringify(arg));
//start consuming messages
consumer.consume(1);
});
if(consumer.isReady){
consumer.commit();
consumer.consume(1);
}
consumer.on('data', function (m) {
counter++;
if (counter % numMessages === 0) {
console.log('calling commit');
consumer.commit(m);
}
// var topicName1 = 'test_partitions';
// var topicName2 = 'test_topic2';
// var partition = -1;
//var buff = Buffer.from(m.value.toString());
// Output the actual message contents
//console.log(JSON.stringify(m));
console.log(m.value.toString());
console.log('********'+m.value%2+'********');
// switch (m.value % 2) {
// case 0:
// producer.producer.produce(topicName1, partition, buff, counter.toString());
// break;
// case 1:
// producer.producer.produce(topicName2, partition, buff, counter.toString());
// }
});
producer.producer.on('delivery-report', function (err, report) {
// Report of delivery statistics here:
console.log('delivery-report: ' + JSON.stringify(report));
});
consumer.on('disconnected', function (arg) {
console.log('consumer disconnected. ' + JSON.stringify(arg));
});
//starting the consumer
consumer.connect();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment