Created
February 26, 2024 15:02
-
-
Save Vatsalya-singhi/c6d5742b8aba61954a48fa082ab5c7d2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mqtt_sub.js | |
// When a message is received | |
client.on('message', (receivedTopic, message) => { | |
console.log('Received message on topic:', receivedTopic.toString()); | |
// console.log('Message:', message.toString()); | |
// Call Kafka's produceMessage function with received MQTT message | |
produceKafkaMessage(kafka_topic, message.toString()) | |
.then(() => { | |
console.log('Message published to Kafka'); | |
}) | |
.catch((error) => { | |
console.error('Error publishing message to Kafka:', error); | |
}); | |
}); | |
mqtt_pub.js | |
// Function to publish data | |
const publishData = async () => { | |
const payload = fetch_payload(); | |
const jsonMessage = JSON.stringify(payload); | |
client.publish(mqtt_topic, jsonMessage, (err) => { | |
if (err) { | |
console.error('Error occurred:', err); | |
} else { | |
console.log('Published:', jsonMessage); | |
} | |
}); | |
} | |
kafka_pub.js | |
// Function to produce a message | |
const produceKafkaMessage = async (topic, message) => { | |
await producer.connect(); | |
// Sending a message | |
await producer.send({ | |
topic: topic, | |
compression: CompressionTypes.GZIP, | |
messages: [ | |
{ key: "iot_frame", value: message }, | |
], | |
}); | |
await producer.disconnect(); | |
}; | |
kafka_sub.js | |
async function init() { | |
const minBytes = number_of_messages * payload_size; | |
const maxBytes = (number_of_messages) * payload_size; | |
const maxWaitTimeInMs = number_of_messages * topic_TTG; // should be less than sessionTimeout | |
const heartbeatInterval = 1 * maxWaitTimeInMs; // set as double of maxWaitTimeInMs | |
const sessionTimeout = 3 * heartbeatInterval; // minimum 3 times heartbeatInterval | |
console.log("minBytes=>", minBytes); | |
console.log("maxBytes=>", maxBytes); | |
console.log("maxWaitTimeInMs=>", maxWaitTimeInMs); | |
console.log("heartbeatInterval=>", heartbeatInterval); | |
console.log("sessionTimeout=>", sessionTimeout); | |
const consumer = kafka.consumer({ | |
groupId: 'test-group', | |
minBytes: minBytes, | |
maxBytes: maxBytes, | |
maxWaitTimeInMs: maxWaitTimeInMs, | |
heartbeatInterval: heartbeatInterval, | |
sessionTimeout: sessionTimeout, | |
}); | |
await consumer.connect(); | |
await consumer.subscribe({ topic: kafka_topic, fromBeginning: true }); | |
await consumer.run({ | |
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => { | |
console.log("batch size=>", batch.messages.length); | |
try { | |
const docList = _.map(batch.messages, (message) => JSON.parse(message.value.toString())); | |
await processIOTFrames(docList); | |
} catch (err) { | |
console.log('parse error', err); | |
console.log("check batch=>", batch.messages); | |
} | |
await heartbeat(); | |
}, | |
// eachMessage: async ({ topic, partition, message }) => { | |
// console.log(message.value.toString()); | |
// try { | |
// const payload = JSON.parse(message.value.toString()); | |
// await insertOne(payload); | |
// } catch (err) { | |
// console.log('parse error'); | |
// } | |
// } | |
}) | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment