Skip to content

Instantly share code, notes, and snippets.

@johndstein
Created October 8, 2014 17:20
Show Gist options
  • Save johndstein/78246f10c6d9cfd3ea63 to your computer and use it in GitHub Desktop.
Save johndstein/78246f10c6d9cfd3ea63 to your computer and use it in GitHub Desktop.
kafka.js
#!/usr/bin/env node
var kafka = require('kafka-node');
var winston = require('winston');
var _ = require('lodash');
// Wraps a kafka consumer
// https://github.com/SOHU-Co/kafka-node#consumer
// We intentionally only allow a single topic, group, partition per consumer.
//
// For now, we always autoCommit and use the offset from the commit.
//
// We expect options as shown below which is a flattened version of what's
// documented here
// https://github.com/SOHU-Co/kafka-node#consumer
// and here
// https://github.com/alexguan/node-zookeeper-client#client-createclientconnectionstring-options
//
// Default values are listed below. With the following exceptions.
//
// 1. clientId will always be HIP_INGEST no matter what you set it to.
// 2. autoCommit will always be true . . .
// 3. fromOffset will always be false . . .
//
// We also added a logSilly option that will console.log every message received.
function Consumer(options) {
this.options = _.merge({
"logSilly": false,
"client": {
"connectionString": "localhost:2181",
"clientId": "HIP_INGEST",
"sessionTimeout": 30000,
"spinDelay": 1000,
"retries": 0
},
"consumer": {
"topic": "test",
"groupId": "test",
"partition": 0,
"offset": 0,
"autoCommit": true,
"autoCommitIntervalMs": 5000,
"fetchMaxWaitMs": 100,
"fetchMinBytes": 1,
"fetchMaxBytes": 10240,
"fromOffset": false
}
}, options);
this.options.client.clientId = 'HIP_INGEST';
this.options.consumer.autoCommit = true;
this.options.consumer.fromOffset = false;
this.key = Consumer.key(this.options.consumer.topic,
this.options.consumer.groupId,
this.options.consumer.partition);
winston.info('Instantiating ' + this.toString(true));
// OK, so I learned the VERY hard way today each consumer MUST use a DIFFERENT
// client instance. You can NOT share them.
this.client = new kafka.Client(this.options.client.connectionString,
this.options.client.clientId,
this.options.client);
// The HighLevelConsumer is BROKEN. So we just use Consumer.
// https://github.com/SOHU-Co/kafka-node#highlevelconsumer
this.consumer = new kafka.Consumer(this.client, [{
topic: this.options.consumer.topic,
partition: this.options.consumer.partition,
offset: this.options.consumer.offset
}], this.options.consumer);
this.consumer.on('error', function logKafkaConsumerErrors(err) {
winston.error('Kafka consumer error: ' + err + '. Consumer: ' + this.toString(true));
}.bind(this));
if (this.options.logSilly) {
this.consumer.on('message', function(msg) {
console.log('Kafka consumer silly logging', JSON.stringify(msg, null, 2));
});
}
}
Consumer.key = function key(topic, groupId, partition) {
var delim = '|';
return topic +
delim +
groupId +
delim +
partition;
};
Consumer.prototype.toString = function toString(pretty) {
// So options are not supposed to be modified.
// Someone could modify them.
// That would be evil.
var prefix = 'Kafka consumer ' + this.key + ' ';
if (!this.stringified) {
this.stringifiedPretty = prefix + JSON.stringify(this.options, null, 2);
this.stringified = prefix + JSON.stringify(this.options);
}
if (pretty) {
return this.stringifiedPretty;
} else {
return this.stringified;
}
};
// Our primary responsibility is to load up the consumers and allow
// clients to register on message callbacks for consumers.
//
// This may need methods like close all, add, remove, etc.
function Registry(configs) {
this.configs = configs || [];
this.consumers = {};
this.configs.forEach(function(cfg) {
var consumer = new Consumer(cfg);
this.consumers[consumer.key] = consumer;
}.bind(this));
}
// Closes all the consumers so program can exit gracefully.
Registry.prototype.close = function close(force) {
this.listConsumers().forEach(function(c) {
c.consumer.close(force);
});
};
// Returns array of all consumers for the given topic and groupId.
// Or if no params passed returns all consumers.
Registry.prototype.listConsumers = function listConsumers(topic, groupId) {
var l = [];
if (!topic) {
// Return entire list
for (var key in this.consumers) {
if (this.consumers.hasOwnProperty(key)) {
l.push(this.consumers[key]);
}
}
} else {
// Return only consumers that match topic and groupId
this.listConsumers().forEach(function(c) {
if (c.options.consumer.topic === topic && c.options.consumer.groupId === groupId) {
l.push(c);
}
});
}
return l;
};
// Registers the given function with all consumers of given topic and groupId.
//
// Since not all consumers will be configured to run, we need to allow
// onMessage for consumers that don't exist.
Registry.prototype.onMessage = function onMessage(topic, groupId, fn) {
var consumers = this.listConsumers(topic, groupId);
if (consumers.length === 0) {
winston.info('No kafka consumer registered for topic, ' + topic + ', groupId, ' + groupId);
}
consumers.forEach(function(consumer) {
winston.info('Registering kafka handler for topic, ' + topic + ', groupId, ' + groupId);
consumer.consumer.on('message', fn);
});
};
// module.exports = Consumer;
module.exports = Registry;
// If we are run from command line you can play around with this code.
// Won't do anything if we are required as a module.
if (!module.parent) {
var reg = new Registry([{
logSilly: true,
consumer: {
topic: 'job_result',
groupId: 'job_result'
}
}]);
reg.onMessage('job_result', 'job_result', function(msg) {
console.log('HERE is a REG message handler', msg);
});
setTimeout(function() {
reg.close();
}, 5000);
// var consumer = new Consumer({
// logSilly: true,
// consumer: {
// topic: 'job_result',
// groupId: 'job_result'
// },
// client: {
// clientId: 'freddy'
// }
// });
//
// setTimeout(function() {
// consumer.consumer.close();
// }, 5000);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment