Skip to content

Instantly share code, notes, and snippets.

@jacace
Created February 1, 2021 12:37
Show Gist options
  • Save jacace/a6ee766a8a40cf72e28e125a50a2bad3 to your computer and use it in GitHub Desktop.
Save jacace/a6ee766a8a40cf72e28e125a50a2bad3 to your computer and use it in GitHub Desktop.
Kafka consume message
//Sample kafka client in nodejs
'use strict';
require('dotenv').config()
//kafka init starts
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: process.env.consumerid,
brokers: [ process.env.bootstrapservers ],
ssl: true,
sasl: {
mechanism: process.env.saslmechanisms,
username: process.env.saslusername,
password: process.env.saslpassword
},
})
//kafka init ends
//consumer starts
const consumer = kafka.consumer({ groupId: process.env.groupid })
consumer.connect()
consumer.subscribe({
topic: process.env.topic, fromBeginning: true
})
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
//consumer end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment