Created
January 6, 2022 15:21
-
-
Save pietheinstrengholt/e5ff72bc353f605e21eecdea8d86da27 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Kafka = require('node-rdkafka'); | |
var producer = new Kafka.Producer({ | |
//'debug' : 'all', | |
'metadata.broker.list': 'atlas-004133bc-3c87-4862-bf9d-b0ea6ae351f5.servicebus.windows.net:9093', //REPLACE | |
'dr_cb': true, //delivery report callback | |
'security.protocol': 'SASL_SSL', | |
'sasl.mechanisms': 'PLAIN', | |
'sasl.username': '$ConnectionString', //do not replace $ConnectionString | |
'sasl.password': 'Endpoint=sb://atlas-004133bc-3c87-4862-bf9d-b0ea6ae351f5.servicebus.windows.net/;SharedAccessKeyName=AlternateSharedAccessKey;SharedAccessKey=WrIVbXQnYutxKXsvmfP+Wz4G4OLKHjDtuKH&6=' //REPLACE | |
}); | |
var topicName = 'ATLAS_HOOK'; | |
//logging debug messages, if debug is enabled | |
producer.on('event.log', function(log) { | |
console.log(log); | |
}); | |
//logging all errors | |
producer.on('event.error', function(err) { | |
console.error('Error from producer'); | |
console.error(err); | |
}); | |
//counter to stop this sample after maxMessages are sent | |
var counter = 0; | |
var maxMessages = 1; | |
producer.on('delivery-report', function(err, report) { | |
console.log('delivery-report: ' + JSON.stringify(report)); | |
counter++; | |
}); | |
//Wait for the ready event before producing | |
producer.on('ready', function(arg) { | |
console.log('producer ready.' + JSON.stringify(arg)); | |
for (var i = 0; i < maxMessages; i++) { | |
var value = new Buffer(`{ | |
"msgCreatedBy":"nayenama", | |
"message":{ | |
"type":"ENTITY_CREATE_V2", | |
"user":"admin", | |
"entities":{ | |
"entities":[ | |
{ | |
"typeName":"azure_sql_table", | |
"attributes":{ | |
"owner":"admin", | |
"temporary":false, | |
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable", | |
"name":"SalesOrderTable", | |
"description":"Sales Order Table added via Kafka" | |
}, | |
"relationshipAttributes":{ | |
"columns":[ | |
{ | |
"guid":"-1102395743156037", | |
"typeName":"azure_sql_column", | |
"uniqueAttributes":{ | |
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID" | |
} | |
}, | |
{ | |
"guid":"-1102395743156038", | |
"typeName":"azure_sql_column", | |
"uniqueAttributes":{ | |
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate" | |
} | |
} | |
] | |
}, | |
"guid":"-1102395743156036", | |
"version":0 | |
} | |
], | |
"referredEntities":{ | |
"-1102395743156037":{ | |
"typeName":"azure_sql_column", | |
"attributes":{ | |
"owner":null, | |
"userTypeId":61, | |
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID", | |
"precision":23, | |
"length":8, | |
"description":"Sales Order ID", | |
"scale":3, | |
"name":"OrderID", | |
"data_type":"int" | |
}, | |
"relationshipAttributes":{ | |
"table":{ | |
"guid":"-1102395743156036", | |
"typeName":"azure_sql_table", | |
"entityStatus":"ACTIVE", | |
"displayText":"SalesOrderTable", | |
"uniqueAttributes":{ | |
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable" | |
} | |
} | |
}, | |
"guid":"-1102395743156037", | |
"version":2 | |
}, | |
"-1102395743156038":{ | |
"typeName":"azure_sql_column", | |
"attributes":{ | |
"owner":null, | |
"userTypeId":61, | |
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate", | |
"description":"Sales Order Date", | |
"scale":3, | |
"name":"OrderDate", | |
"data_type":"datetime" | |
}, | |
"relationshipAttributes":{ | |
"table":{ | |
"guid":"-1102395743156036", | |
"typeName":"azure_sql_table", | |
"entityStatus":"ACTIVE", | |
"displayText":"SalesOrderTable", | |
"uniqueAttributes":{ | |
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable" | |
} | |
} | |
}, | |
"guid":"-1102395743156038", | |
"status":"ACTIVE", | |
"createdBy":"ServiceAdmin", | |
"version":0 | |
} | |
} | |
} | |
}, | |
"version":{ | |
"version":"1.0.0" | |
}, | |
"msgCompressionKind":"NONE", | |
"msgSplitIdx":1, | |
"msgSplitCount":1 | |
}`); | |
var key = "key-"+i; | |
// if partition is set to -1, librdkafka will use the default partitioner | |
var partition = -1; | |
producer.produce(topicName, partition, value, key); | |
} | |
//need to keep polling for a while to ensure the delivery reports are received | |
var pollLoop = setInterval(function() { | |
producer.poll(); | |
if (counter === maxMessages) { | |
clearInterval(pollLoop); | |
producer.disconnect(); | |
} | |
}, 1000); | |
}); | |
producer.on('disconnected', function(arg) { | |
console.log('producer disconnected. ' + JSON.stringify(arg)); | |
}); | |
//starting the producer | |
producer.connect(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment