Last active
April 1, 2019 13:48
-
-
Save BrainCrumbz/f71a4867a4d2c0e30ee5 to your computer and use it in GitHub Desktop.
mosca publish demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some instructions: | |
* Create two separate directories, one for the client, one for the server | |
* Copy client related files in its directory. Do the same for server files | |
* Rename client-package.json as just package.json. Do the same for server-package.json | |
At the moment, for some reason, server message 'MQTT broker is up and running' is logged twice, as if onCreated is run twice. 'ready' event, instead, runs only once. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
MQTT Client is starting... | |
MQTT client connected | |
MQTT client subscribing to broker/someTopic... | |
subscribe granted: [ { topic: 'broker/someTopic', qos: 1 } ] | |
######################################### | |
MQTT client received message | |
* topic broker/someTopic | |
* message <Buffer 7b 22 66 69 65 6c 64 41 22 3a 22 61 22 2c 22 66 69 65 6c 64 42 22 3a 22 62 22 7d> | |
* message (stringified) [123,34,102,105,101,108,100,65,34,58,34,97,34,44,34,102,105,101,108,100,66,34,58,34,98,34,125] | |
* packet { cmd: 'publish', | |
retain: false, | |
qos: 1, | |
dup: false, | |
length: 47, | |
topic: 'broker/someTopic', | |
payload: <Buffer 7b 22 66 69 65 6c 64 41 22 3a 22 61 22 2c 22 66 69 65 6c 64 42 22 3a 22 62 22 7d>, | |
messageId: 1 } | |
* payload (unpacked) {"fieldA":"a","fieldB":"b"} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "mosca-publish-demo-client", | |
"version": "1.0.0", | |
"description": "", | |
"main": "client.js", | |
"scripts": { | |
"test": "echo \"Error: no test specified\" && exit 1" | |
}, | |
"author": "BrainCrumbz", | |
"license": "MIT", | |
"dependencies": { | |
"mqtt": "^1.1.3" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var mqtt = require('mqtt'); | |
console.log('MQTT Client is starting...'); | |
/* | |
// remote Test Mosquitto MQTT server instance | |
var client = mqtt.connect('mqtt://test.mosquitto.org'); | |
*/ | |
// local mosca MQTT server instance | |
var client = mqtt.connect({ | |
port: 1883, | |
host: 'localhost', | |
keepalive: 10000, | |
}); | |
var topicOfInterest = 'broker/someTopic'; | |
var topicSubscribeOpts = { | |
qos: 1, | |
}; | |
client.on('connect', function () { | |
console.log('MQTT client connected'); | |
console.log('MQTT client subscribing to ' + topicOfInterest + '...'); | |
client.subscribe(topicOfInterest, topicSubscribeOpts, function onSubscribe(err, granted) { | |
if (err) { | |
console.log('subscribe errors:', err); | |
} | |
if (granted) { | |
console.log('subscribe granted:', granted); | |
} | |
}); | |
}); | |
// fired when a message is received on one of the subscribed topic | |
client.on('message', function (topic, message, packet) { | |
console.log('\n\n#########################################'); | |
console.log('MQTT client received message'); | |
console.log(' * topic', topic); | |
console.log(' * message', message); | |
console.log(' * message (stringified)', JSON.stringify(message)); | |
console.log(' * packet', packet); | |
console.log(' * payload (unpacked)', packet.payload.toString('utf8')); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
MQTT broker is starting... | |
MQTT broker is up and running | |
MQTT broker is up and running | |
MQTT broker is ready | |
######################################### | |
MQTT broker sending message to board .. | |
MQTT broker detected a published message | |
* packet: { topic: 'broker/someTopic', | |
payload: '{"fieldA":"a","fieldB":"b"}', | |
qos: 1, | |
retain: false } | |
* packet payload: {"fieldA":"a","fieldB":"b"} | |
MQTT broker message sent |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "mosca-publish-demo-server", | |
"version": "1.0.0", | |
"description": "", | |
"main": "server.js", | |
"scripts": { | |
"test": "echo \"Error: no test specified\" && exit 1" | |
}, | |
"author": "BrainCrumbz", | |
"license": "MIT", | |
"dependencies": { | |
"mosca": "^0.29.0" | |
}, | |
"devDependencies": {} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var mosca = require('mosca'); | |
console.log('MQTT broker is starting...'); | |
var inMemoryBrokerSettings = { | |
port: 1883, // mosca (mqtt) port | |
persistence: mosca.persistence.Memory, // using ascoltatore over memory | |
}; | |
var brokerSettings = inMemoryBrokerSettings; | |
// here MQTT broker is started | |
var broker = new mosca.Server(brokerSettings, function onCreated(err, broker) { | |
// assume no errors | |
console.log('MQTT broker is up and running'); | |
}); | |
broker.on('ready', function onReady() { | |
console.log('MQTT broker is ready') | |
setInterval(publishMessage, 5000); | |
}); | |
// fired when a client connects | |
broker.on('clientConnected', function onClientConnected(client) { | |
console.log('MQTT client connected, id', client.id); | |
}); | |
// fired when a client disconnects | |
broker.on('clientDisconnected', function onClientDisconnected(client) { | |
console.log('MQTT client disconnected, id', client.id); | |
}); | |
// fired when a message is published | |
broker.on('published', function onPublished(packet, client) { | |
console.log('MQTT broker detected a published message'); | |
console.log(' * packet:', packet); | |
console.log(' * packet payload:', packet.payload.toString()); | |
}); | |
var topicOfInterest = 'broker/someTopic'; | |
var objectPayload = { | |
fieldA: 'a', | |
fieldB: 'b', | |
}; | |
var textPayload = JSON.stringify(objectPayload); | |
var bufferPayload = new Buffer(textPayload, 'utf-8'); | |
function publishMessage() { | |
var packet = { | |
topic: topicOfInterest, | |
payload: textPayload, | |
//payload: bufferPayload, | |
qos: 1, | |
retain: false, | |
}; | |
console.log('\n\n#########################################'); | |
console.log('MQTT broker sending message to board ..\n'); | |
broker.publish(packet, function() { | |
console.log('MQTT broker message sent'); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Giuseppe,
I wondered if you'd developed your application any further?
Looking at the outputs of your code, it seems that the packet.payload is changing depending upon the condition being experienced (client connect, client subscribe, client publish, etc.). Sometimes packet.payload is the clientID and others it might be the message.
In my application, I want to write data to a MongoDB collection ONLY when the Mosca broker server.on receives a "published" command. I want to write just the topic, message (packet.payload.toString()), and a created timestamp. However, I find that each event is being inserted into the database. I thought that the message publishing and the construction of the packet would be consistent and payload would always be "message". My database is being filled with all events instead of just message data.
The insertOne function to write to the database is being invoked on each event so I have to wonder if I have incorrectly isolated the function.
I have posted my code and outputs on Gitter/Mosca and if you have any time to look, I would really appreciate any advice to fix the problem. However, a simplified broker is listed below:
Thanks,
David Richards
var mosca = require('mosca')
, MongoClient = require('mongodb').MongoClient
, assert = require('assert');
var pubsubsettings = {
type: 'mongo',
url: 'mongodb://localhost:27017/mqtt',
pubsubCollection: 'msgStore',
mongo: {}
};
var settings = {
port: 1883,
backend: pubsubsettings
};
// START THE MOSCA SERVER
var server = new mosca.Server(settings);
server.on('ready', setup);
// FIRED WHEN MOSCA SERVER IS READY
function setup() {
console.log('Mosca server is up and running');
}
// FIRED WHEN A MESSAGE IS RECEIVED
server.on('published', function (packet, client) {
writeDB(packet);
});
// FIRED WHEN A CLIENT SUBSCRIBES TO TOPIC
server.on('subscribed', function (topic, client) {
console.log('Client subscribed to: ', topic);
});
function writeDB(packet) {
var url = 'mongodb://localhost:27017/packets';
var d = new Date();
// Use connect method to connect to the Server
MongoClient.connect(url, function (err, db) {
assert.equal(null, err);
};