Skip to content

Instantly share code, notes, and snippets.

@tilomitra
Created November 6, 2017 20:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tilomitra/8a79a2262d5d8ae836a8642c5b43deaa to your computer and use it in GitHub Desktop.
Save tilomitra/8a79a2262d5d8ae836a8642c5b43deaa to your computer and use it in GitHub Desktop.
Kafka Consumer in NodeJS
import kafka from "kafka-node"
const client = new kafka.Client("http://localhost:2181");
const topics = [
{
topic: "webevents.dev"
}
];
const options = {
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: "buffer"
};
const consumer = new kafka.HighLevelConsumer(client, topics, options);
consumer.on("message", function(message) {
// Read string into a buffer.
var buf = new Buffer(message.value, "binary");
var decodedMessage = JSON.parse(buf.toString());
//Events is a Sequelize Model Object.
return Events.create({
id: decodedMessage.id,
type: decodedMessage.type,
userId: decodedMessage.userId,
sessionId: decodedMessage.sessionId,
data: JSON.stringify(decodedMessage.data),
createdAt: new Date()
});
});
consumer.on("error", function(err) {
console.log("error", err);
});
process.on("SIGINT", function() {
consumer.close(true, function() {
process.exit();
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment