Skip to content

Instantly share code, notes, and snippets.

@pietheinstrengholt
Created January 6, 2022 15:21
Show Gist options
  • Save pietheinstrengholt/e5ff72bc353f605e21eecdea8d86da27 to your computer and use it in GitHub Desktop.
Save pietheinstrengholt/e5ff72bc353f605e21eecdea8d86da27 to your computer and use it in GitHub Desktop.
var Kafka = require('node-rdkafka');
var producer = new Kafka.Producer({
//'debug' : 'all',
'metadata.broker.list': 'atlas-004133bc-3c87-4862-bf9d-b0ea6ae351f5.servicebus.windows.net:9093', //REPLACE
'dr_cb': true, //delivery report callback
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '$ConnectionString', //do not replace $ConnectionString
'sasl.password': 'Endpoint=sb://atlas-004133bc-3c87-4862-bf9d-b0ea6ae351f5.servicebus.windows.net/;SharedAccessKeyName=AlternateSharedAccessKey;SharedAccessKey=WrIVbXQnYutxKXsvmfP+Wz4G4OLKHjDtuKH&6=' //REPLACE
});
var topicName = 'ATLAS_HOOK';
//logging debug messages, if debug is enabled
producer.on('event.log', function(log) {
console.log(log);
});
//logging all errors
producer.on('event.error', function(err) {
console.error('Error from producer');
console.error(err);
});
//counter to stop this sample after maxMessages are sent
var counter = 0;
var maxMessages = 1;
producer.on('delivery-report', function(err, report) {
console.log('delivery-report: ' + JSON.stringify(report));
counter++;
});
//Wait for the ready event before producing
producer.on('ready', function(arg) {
console.log('producer ready.' + JSON.stringify(arg));
for (var i = 0; i < maxMessages; i++) {
var value = new Buffer(`{
"msgCreatedBy":"nayenama",
"message":{
"type":"ENTITY_CREATE_V2",
"user":"admin",
"entities":{
"entities":[
{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"temporary":false,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table added via Kafka"
},
"relationshipAttributes":{
"columns":[
{
"guid":"-1102395743156037",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
}
},
{
"guid":"-1102395743156038",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
}
}
]
},
"guid":"-1102395743156036",
"version":0
}
],
"referredEntities":{
"-1102395743156037":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
"precision":23,
"length":8,
"description":"Sales Order ID",
"scale":3,
"name":"OrderID",
"data_type":"int"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156037",
"version":2
},
"-1102395743156038":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
"description":"Sales Order Date",
"scale":3,
"name":"OrderDate",
"data_type":"datetime"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156038",
"status":"ACTIVE",
"createdBy":"ServiceAdmin",
"version":0
}
}
}
},
"version":{
"version":"1.0.0"
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1
}`);
var key = "key-"+i;
// if partition is set to -1, librdkafka will use the default partitioner
var partition = -1;
producer.produce(topicName, partition, value, key);
}
//need to keep polling for a while to ensure the delivery reports are received
var pollLoop = setInterval(function() {
producer.poll();
if (counter === maxMessages) {
clearInterval(pollLoop);
producer.disconnect();
}
}, 1000);
});
producer.on('disconnected', function(arg) {
console.log('producer disconnected. ' + JSON.stringify(arg));
});
//starting the producer
producer.connect();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment