Created
January 6, 2022 14:44
-
-
Save pietheinstrengholt/39f3fed3519e650c310cb563932a0ac4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Transform = require('stream').Transform; | |
var Kafka = require('node-rdkafka'); //See more info: https://github.com/Blizzard/node-rdkafka | |
var consumer = new Kafka.KafkaConsumer({ | |
//'debug' : 'all', | |
'metadata.broker.list': 'atlas-004133bc-3c87-4862-bf9d-b0ea6ae351f5.servicebus.windows.net:9093', //REPLACE | |
'group.id': 'nodejs-cg', //The default consumer group for EventHubs is $Default | |
'socket.keepalive.enable': true, | |
'enable.auto.commit': false, | |
'security.protocol': 'SASL_SSL', | |
'sasl.mechanisms': 'PLAIN', | |
'sasl.username': '$ConnectionString', //do not replace $ConnectionString | |
'sasl.password': 'Endpoint=sb://atlas-004133bc-3c87-4862-bf9d-b0ea6ae351f5.servicebus.windows.net/;SharedAccessKeyName=AlternateSharedAccessKey;SharedAccessKey=WrIVbXQnYutxKXsvmfP+Wz4G4OLKHjDtuuR8sdfsE1=', //REPLACE | |
}); | |
//Purview's default topic name: https://docs.microsoft.com/en-us/azure/purview/manage-kafka-dotnet | |
var topicName = 'ATLAS_ENTITIES'; | |
//logging debug messages, if debug is enabled | |
consumer.on('event.log', function(log) { | |
console.log(log); | |
}); | |
//logging all errors | |
consumer.on('event.error', function(err) { | |
console.error('Error from consumer'); | |
console.error(err); | |
}); | |
consumer.on('ready', function(arg) { | |
console.log('consumer ready.' + JSON.stringify(arg)); | |
consumer.subscribe([topicName]); | |
//start consuming messages | |
consumer.consume(); | |
}); | |
consumer.on('data', function(event) { | |
// Output the actual message contents | |
const obj = JSON.parse(event.value); | |
console.log(obj); | |
}); | |
consumer.on('disconnected', function(arg) { | |
console.log('consumer disconnected. ' + JSON.stringify(arg)); | |
}); | |
//starting the consumer | |
consumer.connect(); | |
//stopping this example after 300s | |
setTimeout(function() { | |
consumer.disconnect(); | |
}, 300000); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment