Created
November 20, 2015 17:08
-
-
Save cefn/be716b3099ba2194b473 to your computer and use it in GitHub Desktop.
A minimal test of MQTT publish-subscribe round-trip, implemented as a Mocha (node) test suite
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var child_process = require("child_process"), | |
spawn = child_process.spawn, | |
mqtt = require("mqtt"), | |
mosca = require("mosca"); | |
//uses the callback convention of calling 'done()' with | |
// callback with no argument for success | |
// callback with an argument for error | |
// no callback if nothing to report | |
var uri = "ws://127.0.0.1:3000"; | |
var targetCount = 10; | |
var requireOrder = true; | |
/* | |
var launchServer = launchMosca; | |
var killServer = killMosca; | |
*/ | |
var launchServer = launchMosquitto; | |
var killServer = killMosquitto; | |
var startTime, lastTime; | |
resetTimestamp(); | |
function resetTimestamp(){ | |
startTime = null; | |
lastTime = null; | |
} | |
function timestamp(report){ | |
var time = Date.now(); | |
if(startTime === null){ | |
startTime = time; | |
} | |
console.log( time + ":" + (time - startTime) + (lastTime !== null ? ":" + (time-lastTime) : "") + " " + report); | |
lastTime = time; | |
} | |
function send(done, acked){ | |
timestamp("Sending msg:" + nextMessageOut); | |
client.publish("/" + nextMessageOut, nextMessageOut.toString(), { qos:1, retain:true }, function(err, result){ | |
if(err) done(err); | |
if(acked) acked(); | |
}); | |
nextMessageOut++; | |
} | |
function receive(bytes, done){ | |
var parsedMessageIn = Number(bytes.toString()); | |
timestamp("Received msg:" + parsedMessageIn); | |
if(requireOrder && parsedMessageIn !== nextMessageIn){ | |
done(new Error("Out of order: expecting " + nextMessageIn + " but received " + parsedMessageIn)); | |
} | |
nextMessageIn++; | |
if(nextMessageIn === targetCount) { | |
done(); | |
} | |
} | |
var mosquittoServer; | |
function launchMosquitto(cb){ | |
mosquittoServer = spawn("/usr/local/sbin/mosquitto", ["--config-file", "./mosquitto.conf", "--verbose"], {cwd:__dirname}); | |
cb(); //TODO should something be awaited in Posixland? | |
} | |
function killMosquitto(cb){ | |
mosquittoServer.kill("SIGINT"); | |
mosquittoServer.on("exit", function(){ | |
cb(); | |
}) | |
} | |
var moscaServer; | |
function launchMosca(cb){ | |
var moscaSettings = { | |
persistence: { | |
factory: mosca.persistence.Memory | |
}, | |
http:{ | |
port:3000, | |
static:false, | |
bundle:false | |
}, | |
onlyHttp:true | |
}; | |
moscaServer = new mosca.Server(moscaSettings, cb); | |
} | |
function killMosca(cb){ | |
moscaServer.close(function(){ | |
moscaServer = null; | |
cb(); | |
}); | |
} | |
var client, nextMessageOut, nextMessageIn; | |
beforeEach(function(done){ | |
resetTimestamp(); | |
nextMessageOut = 0; | |
nextMessageIn = 0; | |
launchServer(function(){ | |
client = mqtt.connect(uri); | |
client.on("error", function(err){ | |
timestamp("Error:" + err.toString()); | |
done(err); | |
}); | |
client.on("connect", function(){ | |
timestamp("Connected"); | |
done(); | |
}) | |
}); | |
}); | |
afterEach(function(done){ | |
client.end(true, function(){ | |
client = null; | |
killServer(function(){ | |
done(); | |
}); | |
}); | |
}); | |
describe("All the tests", function(){ | |
it("Can ping back messages to test roundtrip time", function(done){ | |
client.on("message", function(topic, bytes, packet){ | |
receive(bytes, done); //receive message | |
if(nextMessageOut < targetCount) { | |
send(done); //send next message | |
} | |
}); | |
client.subscribe('#', { qos:1 }, function(err, granted){ | |
if(err) { done(err); } | |
else { | |
timestamp("Subscribed"); | |
send(done); //trigger first message in echo chain | |
} | |
}); | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment