Skip to content

Instantly share code, notes, and snippets.

@cefn

cefn/mosca.js

Created Aug 9, 2015
Embed
What would you like to do?
var Q = require("q");
var persistence = require("persistence");
var mosca = require("mosca");
var mqtt = require("mqtt");
var chai = require("chai");
var should = chai.should();
var Promise = Q.Promise;
function promiseServer(){
var moscaOpts = {
backend: {},
logger: {},
stats: true,
port:null,
persistence: {
factory: persistence.Memory
},
http:{
port:3000,
static:false,
bundle:false
},
onlyHttp:true
};
return new Promise(function(resolve, reject){
var mqttServer = new mosca.Server(moscaOpts);
mqttServer.on("ready", function(){
resolve(mqttServer);
});
});
}
function promiseClient(){
return new Promise(function(resolve, reject){
var mqttClient = mqtt.connect("ws://localhost:3000");
mqttClient.on("connect", function(){
resolve(mqttClient);
});
});
}
function promiseReceipt(client){
return new Promise(function(resolve, reject){
client.on("message", function(topic, message, packet){
resolve([topic, message.toString()]);
});
});
}
function promiseSubscription(client, topic){
return new Promise(function(resolve, reject){
console.log("Requesting QoS 2");
client.subscribe(topic, {qos:2}, function(err, granted){
var result = Array.prototype.slice.call(arguments);
console.log("QoS granted is: " + arguments[1][0].qos);
result.should.deep.equal([null, [{qos:1,topic:"Hello"}]]); //THIS INCORRECT TEST ACTUALLY PASSES
resolve(result);
});
});
}
function promisePublication(client,topic,message){
return new Promise(function(resolve, reject){
client.publish(topic, message, {qos:2, retain:true}, function(){ //THIS LINE IS CALLED
resolve(Array.prototype.slice.call(arguments)); //THIS LINE IS NEVER CALLED
});
})
.timeout(1000)
.then(function(result){
console.log("QoS callback from publish() : " + JSON.stringify(result));
return result;
})
.catch(function(){
console.log("No QoS callback from publish()")
return Promise.resolve();
});
}
var testTopic = "Hello",
testMessage = "World";
var testPromise = promiseServer()
.then(function() {
return promiseClient()
})
.then(function(receiver) {
var receivePromise = promiseReceipt(receiver); //initialised before publication so no events are missed
return Promise.resolve()
.then(function(){
return promiseSubscription(receiver, testTopic);
})
.then(function(){
return promiseClient();
})
.then(function(sender){
return promisePublication(sender,testTopic,testMessage);
})
.then(function(){
return receivePromise; //promise initialised earlier is passed back
});
});
testPromise
.timeout(10000)
.then(function(receiveResult){
receiveResult.should.deep.equal(["Hello","World"], "Receive not completed correctly");
console.log("Message Received");
})
.catch(function(err){
console.log("Chain failed" + err);
});
@cefn

This comment has been minimized.

Copy link
Owner Author

@cefn cefn commented Aug 9, 2015

Shows how this configuration of Mosca and MQTT downgrades QoS, misses out QoS handling callbacks from Client#publish(), and with a minor modification, shows that retained messages are never delivered.

The minor modification is outlined below. If you switch the publication and subscription clauses, then the message is never delivered, even though it's a retained message and should be delivered according to http://www.hivemq.com/mqtt-essentials-part-8-retained-messages/

Before switch

        return Promise.resolve()
            .then(function(){
                return promiseSubscription(receiver, testTopic);
            })
            .then(function(){
                return promiseClient();
            })
            .then(function(sender){
                return promisePublication(sender,testTopic,testMessage);
            })
            .then(function(){
                return receivePromise; //promise initialised earlier is passed back
            });

After switch

return Promise.resolve()
            .then(function(){
                return promiseClient();
            })
            .then(function(sender){
                return promisePublication(sender,testTopic,testMessage);
            })
            .then(function(){
                return promiseSubscription(receiver, testTopic);
            })
            .then(function(){
                return receivePromise; //promise initialised earlier is passed back
            });
@cefn

This comment has been minimized.

Copy link
Owner Author

@cefn cefn commented Aug 9, 2015

QoS level 2 is not currently supported by Mosca as described at... moscajs/mosca#327 (comment)

However, I am still investigating the non-delivery of retained messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment