Skip to content

Instantly share code, notes, and snippets.

@cefn
Created August 9, 2015 14:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cefn/747c7123c4c6a34e7775 to your computer and use it in GitHub Desktop.
Save cefn/747c7123c4c6a34e7775 to your computer and use it in GitHub Desktop.
var Q = require("q");
var persistence = require("persistence");
var mosca = require("mosca");
var mqtt = require("mqtt");
var chai = require("chai");
var should = chai.should();
var Promise = Q.Promise;
function promiseServer(){
var moscaOpts = {
backend: {},
logger: {},
stats: true,
port:null,
persistence: {
factory: persistence.Memory
},
http:{
port:3000,
static:false,
bundle:false
},
onlyHttp:true
};
return new Promise(function(resolve, reject){
var mqttServer = new mosca.Server(moscaOpts);
mqttServer.on("ready", function(){
resolve(mqttServer);
});
});
}
function promiseClient(){
return new Promise(function(resolve, reject){
var mqttClient = mqtt.connect("ws://localhost:3000");
mqttClient.on("connect", function(){
resolve(mqttClient);
});
});
}
function promiseReceipt(client){
return new Promise(function(resolve, reject){
client.on("message", function(topic, message, packet){
resolve([topic, message.toString()]);
});
});
}
function promiseSubscription(client, topic){
return new Promise(function(resolve, reject){
console.log("Requesting QoS 2");
client.subscribe(topic, {qos:2}, function(err, granted){
var result = Array.prototype.slice.call(arguments);
console.log("QoS granted is: " + arguments[1][0].qos);
result.should.deep.equal([null, [{qos:1,topic:"Hello"}]]); //THIS INCORRECT TEST ACTUALLY PASSES
resolve(result);
});
});
}
function promisePublication(client,topic,message){
return new Promise(function(resolve, reject){
client.publish(topic, message, {qos:2, retain:true}, function(){ //THIS LINE IS CALLED
resolve(Array.prototype.slice.call(arguments)); //THIS LINE IS NEVER CALLED
});
})
.timeout(1000)
.then(function(result){
console.log("QoS callback from publish() : " + JSON.stringify(result));
return result;
})
.catch(function(){
console.log("No QoS callback from publish()")
return Promise.resolve();
});
}
var testTopic = "Hello",
testMessage = "World";
var testPromise = promiseServer()
.then(function() {
return promiseClient()
})
.then(function(receiver) {
var receivePromise = promiseReceipt(receiver); //initialised before publication so no events are missed
return Promise.resolve()
.then(function(){
return promiseSubscription(receiver, testTopic);
})
.then(function(){
return promiseClient();
})
.then(function(sender){
return promisePublication(sender,testTopic,testMessage);
})
.then(function(){
return receivePromise; //promise initialised earlier is passed back
});
});
testPromise
.timeout(10000)
.then(function(receiveResult){
receiveResult.should.deep.equal(["Hello","World"], "Receive not completed correctly");
console.log("Message Received");
})
.catch(function(err){
console.log("Chain failed" + err);
});
@cefn
Copy link
Author

cefn commented Aug 9, 2015

Shows how this configuration of Mosca and MQTT downgrades QoS, misses out QoS handling callbacks from Client#publish(), and with a minor modification, shows that retained messages are never delivered.

The minor modification is outlined below. If you switch the publication and subscription clauses, then the message is never delivered, even though it's a retained message and should be delivered according to http://www.hivemq.com/mqtt-essentials-part-8-retained-messages/

Before switch

        return Promise.resolve()
            .then(function(){
                return promiseSubscription(receiver, testTopic);
            })
            .then(function(){
                return promiseClient();
            })
            .then(function(sender){
                return promisePublication(sender,testTopic,testMessage);
            })
            .then(function(){
                return receivePromise; //promise initialised earlier is passed back
            });

After switch

return Promise.resolve()
            .then(function(){
                return promiseClient();
            })
            .then(function(sender){
                return promisePublication(sender,testTopic,testMessage);
            })
            .then(function(){
                return promiseSubscription(receiver, testTopic);
            })
            .then(function(){
                return receivePromise; //promise initialised earlier is passed back
            });

@cefn
Copy link
Author

cefn commented Aug 9, 2015

QoS level 2 is not currently supported by Mosca as described at... moscajs/mosca#327 (comment)

However, I am still investigating the non-delivery of retained messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment