Skip to content

Instantly share code, notes, and snippets.

@pietheinstrengholt
Created January 6, 2022 14:44
Show Gist options
  • Save pietheinstrengholt/39f3fed3519e650c310cb563932a0ac4 to your computer and use it in GitHub Desktop.
Save pietheinstrengholt/39f3fed3519e650c310cb563932a0ac4 to your computer and use it in GitHub Desktop.
var Transform = require('stream').Transform;
var Kafka = require('node-rdkafka'); //See more info: https://github.com/Blizzard/node-rdkafka
var consumer = new Kafka.KafkaConsumer({
//'debug' : 'all',
'metadata.broker.list': 'atlas-004133bc-3c87-4862-bf9d-b0ea6ae351f5.servicebus.windows.net:9093', //REPLACE
'group.id': 'nodejs-cg', //The default consumer group for EventHubs is $Default
'socket.keepalive.enable': true,
'enable.auto.commit': false,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '$ConnectionString', //do not replace $ConnectionString
'sasl.password': 'Endpoint=sb://atlas-004133bc-3c87-4862-bf9d-b0ea6ae351f5.servicebus.windows.net/;SharedAccessKeyName=AlternateSharedAccessKey;SharedAccessKey=WrIVbXQnYutxKXsvmfP+Wz4G4OLKHjDtuuR8sdfsE1=', //REPLACE
});
//Purview's default topic name: https://docs.microsoft.com/en-us/azure/purview/manage-kafka-dotnet
var topicName = 'ATLAS_ENTITIES';
//logging debug messages, if debug is enabled
consumer.on('event.log', function(log) {
console.log(log);
});
//logging all errors
consumer.on('event.error', function(err) {
console.error('Error from consumer');
console.error(err);
});
consumer.on('ready', function(arg) {
console.log('consumer ready.' + JSON.stringify(arg));
consumer.subscribe([topicName]);
//start consuming messages
consumer.consume();
});
consumer.on('data', function(event) {
// Output the actual message contents
const obj = JSON.parse(event.value);
console.log(obj);
});
consumer.on('disconnected', function(arg) {
console.log('consumer disconnected. ' + JSON.stringify(arg));
});
//starting the consumer
consumer.connect();
//stopping this example after 300s
setTimeout(function() {
consumer.disconnect();
}, 300000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment