Skip to content

Instantly share code, notes, and snippets.

@jenschr
Created November 5, 2019 08:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jenschr/8ad998dd578a227c3111ea0157b6dad3 to your computer and use it in GitHub Desktop.
Save jenschr/8ad998dd578a227c3111ea0157b6dad3 to your computer and use it in GitHub Desktop.
var mqtt = require('mqtt'); //https://www.npmjs.com/package/mqtt
var Topic = '#'; //subscribe to all topics
var Broker_URL = 'mqtt://localhost';
var Database_URL = 'localhost';
var options = {
clientId: 'MyMQTT'+Math.random()*64545353,
port: 1883,
username: 'MyUsername',
password: 'MyPassword',
keepalive : 60
};
// Generic wait-function. Used to throttle MQTT connections.
var wait = function(startTime){
var d = new Date();
var millis = d.getTime();
var stopAt = millis+(startTime*1000);
console.log("Start waiting "+startTime+" seconds");
while(millis < stopAt ){
d = new Date();
millis = d.getTime();
//console.log("d: "+millis);
}
console.log("Finished waiting "+startTime+" seconds");
}
var client = mqtt.connect(Broker_URL, options);
client.on('connect', mqtt_connect);
client.on('reconnect', mqtt_reconnect);
client.on('error', mqtt_error);
client.on('message', mqtt_messsageReceived);
client.on('close', mqtt_close);
function mqtt_connect() {
wait(1);
console.log("Connecting MQTT");
client.subscribe(Topic, mqtt_subscribe);
};
function mqtt_subscribe(err, granted) {
console.log("Subscribed to " + Topic);
if (err) {console.log(err);}
};
function mqtt_reconnect(err) {
wait(2);
console.log("Reconnect MQTT");
if (err) {console.log(err);}
options.clientId = 'MyMQTT'+Math.random()*64545353;
client = mqtt.connect(Broker_URL, options);
};
function mqtt_error(err) {
console.log("Error!");
if (err) {console.log(err);}
};
function after_publish() {
//do nothing
};
//receive a message from MQTT broker
function mqtt_messsageReceived(topic, message, packet) {
var message_str = message.toString(); //convert byte array to string
message_str = message_str.replace(/\n$/, ''); //remove new line
console.log( "instances: "+countInstances(message_str));
var startAt = topic.indexOf("/machines/");
var parseTo = topic.indexOf("/",10);
var machineId = topic.substring(10,parseTo);
var parsedTopic = topic.substring(parseTo+1);
//payload syntax: clientID,topic,message
if (countInstances(message_str) == -1) {
console.log("Invalid payload");
console.log(message_str);
} else {
insert_message(parsedTopic, machineId, message_str);
console.log("topic: "+parsedTopic);
console.log("machine: "+machineId);
console.log("payload: "+message_str);
}
};
function mqtt_close() {
console.log("Close MQTT");
};
////////////////////////////////////////////////////
///////////////////// MYSQL ////////////////////////
////////////////////////////////////////////////////
var mysql = require('mysql'); //https://www.npmjs.com/package/mysql
//Create Connection
var connection = mysql.createConnection({
host: Database_URL,
user: "ee_bruker",
password: "p[gsqJ4mL8;UR}T@",
database: "rost_client_website"
});
connection.connect(function(err) {
if (err) throw err;
console.log("Database Connected!");
});
//insert a row into the tbl_messages table
function insert_message(topic, machine, data) {
sql = "INSERT INTO events (topic,machine,content) VALUES ('"+topic+"','"+machine+"','"+data+"');";
connection.query(sql, function (error, results) {
if (error) throw error;
console.log("Message added: " + data);
});
};
//split a string into an array of substrings
function extract_string(message_str) {
var message_arr = message_str.split(","); //convert to array
return message_arr;
};
//count number of delimiters in a string
var delimiter = ",";
function countInstances(message_str) {
var substrings = message_str.split(delimiter);
return substrings.length - 1;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment