Skip to content

Instantly share code, notes, and snippets.

@jeremy-brenner
Created September 23, 2018 18:01
Show Gist options
  • Save jeremy-brenner/07cbc5fc165bbbdda7c10cef0eed7a5f to your computer and use it in GitHub Desktop.
Save jeremy-brenner/07cbc5fc165bbbdda7c10cef0eed7a5f to your computer and use it in GitHub Desktop.
kafka node consumer
const kafkaConsumer = require('./kafka-consumer.js');
kafkaConsumer.observableFor('support_calls').subscribe( (data) => dostuff.with(data) );
{
"kafkaHost": "10.194.91.83:9092",
"groupId": "whatever",
"sessionTimeout": 15000
}
const kafka = require('kafka-node');
const kafkaConfig = require('./kafka-config.json');
const { fromEvent } = require('rxjs');
const { map } = require("rxjs/operators");
const options = {
...kafkaConfig,
onRebalance: (isAlreadyMember, callback) => { callback(); } // or null
};
observableFor = (topic) => {
const consumerGroup = new kafka.ConsumerGroup(options, [topic]);
return fromEvent(consumerGroup, "message").pipe(
map(item => JSON.parse(JSON.parse(item.value))),
map(invertObject),
map(arrayify)
);
}
module.exports = { observableFor };
function invertObject(inputObject) {
const outputObject = {};
Object.keys(inputObject).forEach((innerKey) => {
Object.keys(inputObject[innerKey]).forEach((outerKey) => {
if(!outputObject[outerKey]){
outputObject[outerKey] = {};
}
outputObject[outerKey][innerKey] = inputObject[innerKey][outerKey];
});
});
return outputObject;
}
function arrayify(inputObject) {
return Object.keys(inputObject).map( id => ({ id, ...inputObject[id]}));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment