Skip to content

Instantly share code, notes, and snippets.

@cefn
Created November 20, 2015 17:08
Show Gist options
  • Save cefn/be716b3099ba2194b473 to your computer and use it in GitHub Desktop.
Save cefn/be716b3099ba2194b473 to your computer and use it in GitHub Desktop.
A minimal test of MQTT publish-subscribe round-trip, implemented as a Mocha (node) test suite
var child_process = require("child_process"),
spawn = child_process.spawn,
mqtt = require("mqtt"),
mosca = require("mosca");
//uses the callback convention of calling 'done()' with
// callback with no argument for success
// callback with an argument for error
// no callback if nothing to report
var uri = "ws://127.0.0.1:3000";
var targetCount = 10;
var requireOrder = true;
/*
var launchServer = launchMosca;
var killServer = killMosca;
*/
var launchServer = launchMosquitto;
var killServer = killMosquitto;
var startTime, lastTime;
resetTimestamp();
function resetTimestamp(){
startTime = null;
lastTime = null;
}
function timestamp(report){
var time = Date.now();
if(startTime === null){
startTime = time;
}
console.log( time + ":" + (time - startTime) + (lastTime !== null ? ":" + (time-lastTime) : "") + " " + report);
lastTime = time;
}
function send(done, acked){
timestamp("Sending msg:" + nextMessageOut);
client.publish("/" + nextMessageOut, nextMessageOut.toString(), { qos:1, retain:true }, function(err, result){
if(err) done(err);
if(acked) acked();
});
nextMessageOut++;
}
function receive(bytes, done){
var parsedMessageIn = Number(bytes.toString());
timestamp("Received msg:" + parsedMessageIn);
if(requireOrder && parsedMessageIn !== nextMessageIn){
done(new Error("Out of order: expecting " + nextMessageIn + " but received " + parsedMessageIn));
}
nextMessageIn++;
if(nextMessageIn === targetCount) {
done();
}
}
var mosquittoServer;
function launchMosquitto(cb){
mosquittoServer = spawn("/usr/local/sbin/mosquitto", ["--config-file", "./mosquitto.conf", "--verbose"], {cwd:__dirname});
cb(); //TODO should something be awaited in Posixland?
}
function killMosquitto(cb){
mosquittoServer.kill("SIGINT");
mosquittoServer.on("exit", function(){
cb();
})
}
var moscaServer;
function launchMosca(cb){
var moscaSettings = {
persistence: {
factory: mosca.persistence.Memory
},
http:{
port:3000,
static:false,
bundle:false
},
onlyHttp:true
};
moscaServer = new mosca.Server(moscaSettings, cb);
}
function killMosca(cb){
moscaServer.close(function(){
moscaServer = null;
cb();
});
}
var client, nextMessageOut, nextMessageIn;
beforeEach(function(done){
resetTimestamp();
nextMessageOut = 0;
nextMessageIn = 0;
launchServer(function(){
client = mqtt.connect(uri);
client.on("error", function(err){
timestamp("Error:" + err.toString());
done(err);
});
client.on("connect", function(){
timestamp("Connected");
done();
})
});
});
afterEach(function(done){
client.end(true, function(){
client = null;
killServer(function(){
done();
});
});
});
describe("All the tests", function(){
it("Can ping back messages to test roundtrip time", function(done){
client.on("message", function(topic, bytes, packet){
receive(bytes, done); //receive message
if(nextMessageOut < targetCount) {
send(done); //send next message
}
});
client.subscribe('#', { qos:1 }, function(err, granted){
if(err) { done(err); }
else {
timestamp("Subscribed");
send(done); //trigger first message in echo chain
}
});
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment