Skip to content

Instantly share code, notes, and snippets.

@BrainCrumbz
Last active April 1, 2019 13:48
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save BrainCrumbz/f71a4867a4d2c0e30ee5 to your computer and use it in GitHub Desktop.
Save BrainCrumbz/f71a4867a4d2c0e30ee5 to your computer and use it in GitHub Desktop.
mosca publish demo
Some instructions:
* Create two separate directories, one for the client, one for the server
* Copy client related files in its directory. Do the same for server files
* Rename client-package.json as just package.json. Do the same for server-package.json
At the moment, for some reason, server message 'MQTT broker is up and running' is logged twice, as if onCreated is run twice. 'ready' event, instead, runs only once.
MQTT Client is starting...
MQTT client connected
MQTT client subscribing to broker/someTopic...
subscribe granted: [ { topic: 'broker/someTopic', qos: 1 } ]
#########################################
MQTT client received message
* topic broker/someTopic
* message <Buffer 7b 22 66 69 65 6c 64 41 22 3a 22 61 22 2c 22 66 69 65 6c 64 42 22 3a 22 62 22 7d>
* message (stringified) [123,34,102,105,101,108,100,65,34,58,34,97,34,44,34,102,105,101,108,100,66,34,58,34,98,34,125]
* packet { cmd: 'publish',
retain: false,
qos: 1,
dup: false,
length: 47,
topic: 'broker/someTopic',
payload: <Buffer 7b 22 66 69 65 6c 64 41 22 3a 22 61 22 2c 22 66 69 65 6c 64 42 22 3a 22 62 22 7d>,
messageId: 1 }
* payload (unpacked) {"fieldA":"a","fieldB":"b"}
{
"name": "mosca-publish-demo-client",
"version": "1.0.0",
"description": "",
"main": "client.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "BrainCrumbz",
"license": "MIT",
"dependencies": {
"mqtt": "^1.1.3"
}
}
var mqtt = require('mqtt');
console.log('MQTT Client is starting...');
/*
// remote Test Mosquitto MQTT server instance
var client = mqtt.connect('mqtt://test.mosquitto.org');
*/
// local mosca MQTT server instance
var client = mqtt.connect({
port: 1883,
host: 'localhost',
keepalive: 10000,
});
var topicOfInterest = 'broker/someTopic';
var topicSubscribeOpts = {
qos: 1,
};
client.on('connect', function () {
console.log('MQTT client connected');
console.log('MQTT client subscribing to ' + topicOfInterest + '...');
client.subscribe(topicOfInterest, topicSubscribeOpts, function onSubscribe(err, granted) {
if (err) {
console.log('subscribe errors:', err);
}
if (granted) {
console.log('subscribe granted:', granted);
}
});
});
// fired when a message is received on one of the subscribed topic
client.on('message', function (topic, message, packet) {
console.log('\n\n#########################################');
console.log('MQTT client received message');
console.log(' * topic', topic);
console.log(' * message', message);
console.log(' * message (stringified)', JSON.stringify(message));
console.log(' * packet', packet);
console.log(' * payload (unpacked)', packet.payload.toString('utf8'));
});
MQTT broker is starting...
MQTT broker is up and running
MQTT broker is up and running
MQTT broker is ready
#########################################
MQTT broker sending message to board ..
MQTT broker detected a published message
* packet: { topic: 'broker/someTopic',
payload: '{"fieldA":"a","fieldB":"b"}',
qos: 1,
retain: false }
* packet payload: {"fieldA":"a","fieldB":"b"}
MQTT broker message sent
{
"name": "mosca-publish-demo-server",
"version": "1.0.0",
"description": "",
"main": "server.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "BrainCrumbz",
"license": "MIT",
"dependencies": {
"mosca": "^0.29.0"
},
"devDependencies": {}
}
var mosca = require('mosca');
console.log('MQTT broker is starting...');
var inMemoryBrokerSettings = {
port: 1883, // mosca (mqtt) port
persistence: mosca.persistence.Memory, // using ascoltatore over memory
};
var brokerSettings = inMemoryBrokerSettings;
// here MQTT broker is started
var broker = new mosca.Server(brokerSettings, function onCreated(err, broker) {
// assume no errors
console.log('MQTT broker is up and running');
});
broker.on('ready', function onReady() {
console.log('MQTT broker is ready')
setInterval(publishMessage, 5000);
});
// fired when a client connects
broker.on('clientConnected', function onClientConnected(client) {
console.log('MQTT client connected, id', client.id);
});
// fired when a client disconnects
broker.on('clientDisconnected', function onClientDisconnected(client) {
console.log('MQTT client disconnected, id', client.id);
});
// fired when a message is published
broker.on('published', function onPublished(packet, client) {
console.log('MQTT broker detected a published message');
console.log(' * packet:', packet);
console.log(' * packet payload:', packet.payload.toString());
});
var topicOfInterest = 'broker/someTopic';
var objectPayload = {
fieldA: 'a',
fieldB: 'b',
};
var textPayload = JSON.stringify(objectPayload);
var bufferPayload = new Buffer(textPayload, 'utf-8');
function publishMessage() {
var packet = {
topic: topicOfInterest,
payload: textPayload,
//payload: bufferPayload,
qos: 1,
retain: false,
};
console.log('\n\n#########################################');
console.log('MQTT broker sending message to board ..\n');
broker.publish(packet, function() {
console.log('MQTT broker message sent');
});
}
@mtngit14
Copy link

mtngit14 commented Aug 3, 2016

Hi Giuseppe,
I wondered if you'd developed your application any further?
Looking at the outputs of your code, it seems that the packet.payload is changing depending upon the condition being experienced (client connect, client subscribe, client publish, etc.). Sometimes packet.payload is the clientID and others it might be the message.
In my application, I want to write data to a MongoDB collection ONLY when the Mosca broker server.on receives a "published" command. I want to write just the topic, message (packet.payload.toString()), and a created timestamp. However, I find that each event is being inserted into the database. I thought that the message publishing and the construction of the packet would be consistent and payload would always be "message". My database is being filled with all events instead of just message data.
The insertOne function to write to the database is being invoked on each event so I have to wonder if I have incorrectly isolated the function.
I have posted my code and outputs on Gitter/Mosca and if you have any time to look, I would really appreciate any advice to fix the problem. However, a simplified broker is listed below:
Thanks,
David Richards

var mosca = require('mosca')
, MongoClient = require('mongodb').MongoClient
, assert = require('assert');

var pubsubsettings = {
type: 'mongo',
url: 'mongodb://localhost:27017/mqtt',
pubsubCollection: 'msgStore',
mongo: {}
};

var settings = {
port: 1883,
backend: pubsubsettings
};

// START THE MOSCA SERVER
var server = new mosca.Server(settings);
server.on('ready', setup);

// FIRED WHEN MOSCA SERVER IS READY
function setup() {
console.log('Mosca server is up and running');
}

// FIRED WHEN A MESSAGE IS RECEIVED
server.on('published', function (packet, client) {
writeDB(packet);
});

// FIRED WHEN A CLIENT SUBSCRIBES TO TOPIC
server.on('subscribed', function (topic, client) {
console.log('Client subscribed to: ', topic);
});

function writeDB(packet) {
var url = 'mongodb://localhost:27017/packets';
var d = new Date();
// Use connect method to connect to the Server
MongoClient.connect(url, function (err, db) {
assert.equal(null, err);

    console.log("PacketContent", packet);
    console.log('Published-Payload:', packet.payload.toString());

    db.collection('writeTest5').insertOne({topic: packet.topic, message: packet.payload.toString(), time: d.toISOString()}, function (err, r) {
        assert.equal(null, err);
        assert.equal(1, r.insertedCount);
        db.close();
    });
});

};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment