Skip to content

Instantly share code, notes, and snippets.

@cefn
Created August 9, 2015 14:00
Show Gist options
  • Save cefn/747c7123c4c6a34e7775 to your computer and use it in GitHub Desktop.
Save cefn/747c7123c4c6a34e7775 to your computer and use it in GitHub Desktop.
var Q = require("q");
var persistence = require("persistence");
var mosca = require("mosca");
var mqtt = require("mqtt");
var chai = require("chai");
var should = chai.should();
var Promise = Q.Promise;
function promiseServer(){
var moscaOpts = {
backend: {},
logger: {},
stats: true,
port:null,
persistence: {
factory: persistence.Memory
},
http:{
port:3000,
static:false,
bundle:false
},
onlyHttp:true
};
return new Promise(function(resolve, reject){
var mqttServer = new mosca.Server(moscaOpts);
mqttServer.on("ready", function(){
resolve(mqttServer);
});
});
}
function promiseClient(){
return new Promise(function(resolve, reject){
var mqttClient = mqtt.connect("ws://localhost:3000");
mqttClient.on("connect", function(){
resolve(mqttClient);
});
});
}
function promiseReceipt(client){
return new Promise(function(resolve, reject){
client.on("message", function(topic, message, packet){
resolve([topic, message.toString()]);
});
});
}
function promiseSubscription(client, topic){
return new Promise(function(resolve, reject){
console.log("Requesting QoS 2");
client.subscribe(topic, {qos:2}, function(err, granted){
var result = Array.prototype.slice.call(arguments);
console.log("QoS granted is: " + arguments[1][0].qos);
result.should.deep.equal([null, [{qos:1,topic:"Hello"}]]); //THIS INCORRECT TEST ACTUALLY PASSES
resolve(result);
});
});
}
function promisePublication(client,topic,message){
return new Promise(function(resolve, reject){
client.publish(topic, message, {qos:2, retain:true}, function(){ //THIS LINE IS CALLED
resolve(Array.prototype.slice.call(arguments)); //THIS LINE IS NEVER CALLED
});
})
.timeout(1000)
.then(function(result){
console.log("QoS callback from publish() : " + JSON.stringify(result));
return result;
})
.catch(function(){
console.log("No QoS callback from publish()")
return Promise.resolve();
});
}
var testTopic = "Hello",
testMessage = "World";
var testPromise = promiseServer()
.then(function() {
return promiseClient()
})
.then(function(receiver) {
var receivePromise = promiseReceipt(receiver); //initialised before publication so no events are missed
return Promise.resolve()
.then(function(){
return promiseSubscription(receiver, testTopic);
})
.then(function(){
return promiseClient();
})
.then(function(sender){
return promisePublication(sender,testTopic,testMessage);
})
.then(function(){
return receivePromise; //promise initialised earlier is passed back
});
});
testPromise
.timeout(10000)
.then(function(receiveResult){
receiveResult.should.deep.equal(["Hello","World"], "Receive not completed correctly");
console.log("Message Received");
})
.catch(function(err){
console.log("Chain failed" + err);
});
@cefn
Copy link
Author

cefn commented Aug 9, 2015

QoS level 2 is not currently supported by Mosca as described at... moscajs/mosca#327 (comment)

However, I am still investigating the non-delivery of retained messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment