Skip to content

Instantly share code, notes, and snippets.

@theaungmyatmoe
Created April 2, 2024 14:56
Show Gist options
  • Save theaungmyatmoe/f21d6a2f92b8f5b27893ed2174fd5f37 to your computer and use it in GitHub Desktop.
Save theaungmyatmoe/f21d6a2f92b8f5b27893ed2174fd5f37 to your computer and use it in GitHub Desktop.
Kafka Message Parsing via from service
const {CompressionTypes, CompressionCodecs, Kafka} = require('kafkajs')
const SnappyCodec = require('kafkajs-snappy')
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka:9092',],
})
const consumer = kafka.consumer({groupId: 'd-groussp'});
const main = async () => {
await consumer.connect()
await consumer.subscribe({topic: 'wallets', fromBeginning: true})
await consumer.run({
eachMessage: async ({topic, partition, message}) => {
console.log({
value: JSON.parse(JSON.parse(JSON.stringify(message.value.toString())))
})
},
})
}
main()
.then(() => console.log('Consumer started'))
.catch(console.error)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment