Skip to content

Instantly share code, notes, and snippets.

@Vatsalya-singhi
Created February 26, 2024 15:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Vatsalya-singhi/c6d5742b8aba61954a48fa082ab5c7d2 to your computer and use it in GitHub Desktop.
Save Vatsalya-singhi/c6d5742b8aba61954a48fa082ab5c7d2 to your computer and use it in GitHub Desktop.
mqtt_sub.js
// When a message is received
client.on('message', (receivedTopic, message) => {
console.log('Received message on topic:', receivedTopic.toString());
// console.log('Message:', message.toString());
// Call Kafka's produceMessage function with received MQTT message
produceKafkaMessage(kafka_topic, message.toString())
.then(() => {
console.log('Message published to Kafka');
})
.catch((error) => {
console.error('Error publishing message to Kafka:', error);
});
});
mqtt_pub.js
// Function to publish data
const publishData = async () => {
const payload = fetch_payload();
const jsonMessage = JSON.stringify(payload);
client.publish(mqtt_topic, jsonMessage, (err) => {
if (err) {
console.error('Error occurred:', err);
} else {
console.log('Published:', jsonMessage);
}
});
}
kafka_pub.js
// Function to produce a message
const produceKafkaMessage = async (topic, message) => {
await producer.connect();
// Sending a message
await producer.send({
topic: topic,
compression: CompressionTypes.GZIP,
messages: [
{ key: "iot_frame", value: message },
],
});
await producer.disconnect();
};
kafka_sub.js
async function init() {
const minBytes = number_of_messages * payload_size;
const maxBytes = (number_of_messages) * payload_size;
const maxWaitTimeInMs = number_of_messages * topic_TTG; // should be less than sessionTimeout
const heartbeatInterval = 1 * maxWaitTimeInMs; // set as double of maxWaitTimeInMs
const sessionTimeout = 3 * heartbeatInterval; // minimum 3 times heartbeatInterval
console.log("minBytes=>", minBytes);
console.log("maxBytes=>", maxBytes);
console.log("maxWaitTimeInMs=>", maxWaitTimeInMs);
console.log("heartbeatInterval=>", heartbeatInterval);
console.log("sessionTimeout=>", sessionTimeout);
const consumer = kafka.consumer({
groupId: 'test-group',
minBytes: minBytes,
maxBytes: maxBytes,
maxWaitTimeInMs: maxWaitTimeInMs,
heartbeatInterval: heartbeatInterval,
sessionTimeout: sessionTimeout,
});
await consumer.connect();
await consumer.subscribe({ topic: kafka_topic, fromBeginning: true });
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
console.log("batch size=>", batch.messages.length);
try {
const docList = _.map(batch.messages, (message) => JSON.parse(message.value.toString()));
await processIOTFrames(docList);
} catch (err) {
console.log('parse error', err);
console.log("check batch=>", batch.messages);
}
await heartbeat();
},
// eachMessage: async ({ topic, partition, message }) => {
// console.log(message.value.toString());
// try {
// const payload = JSON.parse(message.value.toString());
// await insertOne(payload);
// } catch (err) {
// console.log('parse error');
// }
// }
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment