Skip to content

Instantly share code, notes, and snippets.

@fsck-mount
Created August 21, 2017 07:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fsck-mount/6fb3cdebbc4a2ffd046845bcd8de4abe to your computer and use it in GitHub Desktop.
Save fsck-mount/6fb3cdebbc4a2ffd046845bcd8de4abe to your computer and use it in GitHub Desktop.
Consumer with StartBackgroundTransaction
const nr = require('newrelic');
/* const instrumentKafkaClient = require('../instruments').instrumentKafkaClient;
nr.instrumentMessages(
'kafka-java-bridge',
instrumentKafkaClient,
kafkaInstrumentationError = (err) => {
console.error(err.message, err.stack);
process.exit(-1);
}
); */
const HLConsumer = require('kafka-java-bridge').HLConsumer;
const zkUrl = process.env.ZK_URL;
module.exports = function kafkaConsumer() {
let consumer;
return {
consume(_topics, cb, consumerConfig) {
let topics = _topics;
if (typeof cb !== 'function') {
setTimeout(() => {
process.exit();
}, 300);
}
if (topics.constructor !== Array) topics = [topics];
const consumerOptions = {
zookeeperUrl: zkUrl,
groupId: process.env.KAFKA_GROUP_ID,
getMetadata: true,
/*
Following are optional
*/
topics,
serverPort: 3042,
threadCount: parseInt(process.env.KAFKA_WORKER_COUNT || 1, 10),
properties: { 'rebalance.max.retries': '3' }
};
Object.assign(consumerOptions, consumerConfig);
consumer = new HLConsumer(consumerOptions);
consumer.start((err) => {
if (err) {
setTimeout(() => {
process.exit();
}, 300);
} else {
console.log('Consumer Started');
}
});
consumer.on('error', (err) => {
if (err) {
setTimeout(() => {
process.exit();
}, 300);
}
});
consumer.on('message', (message, metadata) => {
const topic = metadata.topic;
const group = topic.substring(0, topic.length - 2);
nr.startBackgroundTransaction(topic, group, () => {
const tx = nr.getTransaction();
cb(JSON.parse(message.toString()), topic, () => {
tx.end();
});
});
});
},
stop() {
consumer.stop((err) => {
if (err) {
nr.noticeError(err, [gs]);
process.exit();
}
});
console.log('Consumer Stopped');
},
doesExist() {
if (consumer) {
return true;
}
return false;
}
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment