Skip to content

Instantly share code, notes, and snippets.

@JasonGiedymin
Created May 21, 2015 05:39
Show Gist options
  • Save JasonGiedymin/a09026655df5a53e3da5 to your computer and use it in GitHub Desktop.
Save JasonGiedymin/a09026655df5a53e3da5 to your computer and use it in GitHub Desktop.
High Level Kafka High Level Wrapper :-) )
'use strict';
// Imports
var bunyan = require('bunyan');
var Promise = require('bluebird');
var lodash = require('lodash');
var kafka = require('kafka-node');
// Vars
var Producer = kafka.Producer;
var HighLevelProducer = kafka.HighLevelProducer;
var KeyedMessage = kafka.KeyedMessage;
var Client = kafka.Client;
var log = bunyan.createLogger({name: "kafka"});
function Kafka(zkhost, partitions, attributes) {
var instance = this; // keep this reference for closures
this.zkhost = zkhost;
this.partitions = partitions;
this.attributes = attributes;
this.client = new Client(zkhost);
// this.producer = Promise.promisifyAll(new Producer(this.client, { requireAcks: 1 }));
this.producer = Promise.promisifyAll(new HighLevelProducer(this.client, { requireAcks: 1 }));
this.ready = false;
this.producer.on('ready', function () {
log.info('kafka ready');
instance.ready = true;
});
this.producer.on('error', function (err) {
log.info({error: err}, 'kafka error');
});
}
// Wait but not too long, still should be fast.
// Btw, 5000ms is an eternity.
var backoff = function(n) {
// @10 ~ 5600ms
var y = 0.5;
var x = 250;
return Math.floor( Math.pow(n, y) * x );
}
var queue = function(cb, timing) {
setTimeout(cb, timing);
}
function isReady() {
var instance = this;
var count = 0;
var retryLimit = 10;
return new Promise(function(resolve,reject) {
var check = function(count) {
count = count + 1;
if (instance.ready) {
log.info({checkCount: count, ready: instance.ready}, 'Kafka connection made and is ready!')
resolve(instance.ready);
} else {
var timing = backoff(count);
log.warn({checkCount: count, ready: instance.ready}, 'Kafka not ready, checking again in ' + timing + 'ms...');
if (retryLimit > count) {
queue(check.bind(instance, count), timing);
}
else {
var msg = 'Could not make connection with Kafka, tried ' + count + ' times.';
log.error({checkCount: count, ready: instance.ready}, msg);
reject(msg);
}
}
}
check(count);
});
}
function kafkaResult(action, result) {
log.info({action:action, status: 'successful', topicCount: result}, 'message sent successfully');
return result;
}
function kafkaError(action, err) {
// Here we catch the error so we can log it here
// I expect to be here often with debugging.
log.error({action:'sendMessageRaw', status: 'error'}, 'message error');
throw new Error('Message could not be sent to Kafka: ', err);
}
// If a partitionKey is supplied use it to create a keyedMessage
// Otherwise use a regular message.
function sendMessageRaw(topic, message, partitionKey) {
var messageEntry = message;
if (!lodash.isUndefined(partitionKey) && !lodash.isNull(partitionKey)) {
messageEntry = new KeyedMessage(partitionKey, message);
}
return this.producer.sendAsync([
{
topic: topic,
partition: this.partitions,
messages: [messageEntry],
attributes: this.attributes
}
])
.then(kafkaResult.bind(this, 'sendMessageRaw'))
.error(kafkaError.bind(this, 'sendMessageRaw'));
}
// Easier high level producer send
function sendMessage(topic, message) {
return this.producer.sendAsync([
{
topic: topic,
messages: [message],
attributes: this.attributes
}
])
.then(kafkaResult.bind(this, 'sendMessage'))
.error(kafkaError.bind(this, 'sendMessage'));
}
Kafka.prototype.sendMessageRaw = function(topic, message, partitionKey) {
var basicValidation = function(value) {
return (!lodash.isUndefined(value) && !lodash.isNull(value))
}
var results = lodash.map([topic, message], basicValidation);
var allValid = lodash.all(results, Boolean);
if (allValid) {
return sendMessageRaw.call(this, topic, message, partitionKey);
} else {
return Promise.reject('must supply a topic and message when calling sendMessage()');
}
}
Kafka.prototype.sendMessage = function(topic, message) {
var basicValidation = function(value) {
return (!lodash.isUndefined(value) && !lodash.isNull(value))
}
var results = lodash.map([topic, message], basicValidation);
var allValid = lodash.all(results, Boolean);
if (allValid) {
return sendMessage.call(this, topic, message);
} else {
return Promise.reject('must supply a topic and message when calling sendMessage()');
}
}
Kafka.prototype.isReady = function() {
return isReady.call(this);
}
module.exports.instance = Kafka;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment