Skip to content

Instantly share code, notes, and snippets.

@cefn
Created August 9, 2015 17:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cefn/2e87416aa24bfdddbb7d to your computer and use it in GitHub Desktop.
Save cefn/2e87416aa24bfdddbb7d to your computer and use it in GitHub Desktop.
var mosca = require("mosca");
var mqtt = require("mqtt");
function promiseMosca(){
var moscaOpts = {
http:{
port:3000,
static:false,
bundle:false
},
onlyHttp:true
};
return new Promise(function(resolve, reject){
var mqttServer = new mosca.Server(moscaOpts);
mqttServer.on("ready", function(){
resolve(mqttServer);
});
});
}
function promiseMosquitto(){
return Promise.resolve();
}
function promiseClient(){
return new Promise(function(resolve, reject){
var mqttClient = mqtt.connect("ws://localhost:3000");
mqttClient.on("connect", function(){
resolve(mqttClient);
});
});
}
function promiseReceipt(client){
return new Promise(function(resolve, reject){
client.on("message", function(topic, message, packet){
resolve([topic, message.toString()]);
});
});
}
function promiseSubscription(client, topic){
return new Promise(function(resolve, reject){
client.subscribe(topic, {qos:1}, function(err, granted){
var result = Array.prototype.slice.call(arguments);
resolve(result);
});
});
}
function promisePublication(client,topic,message){
return new Promise(function(resolve, reject){
client.publish(topic, message, {qos:1, retain:true}, function(){
resolve(Array.prototype.slice.call(arguments));
});
});
}
var testTopic = "Hello",
testMessage = "World";
//var serverPromise = promiseMosca();
var serverPromise = promiseMosquitto();
var testPromise = serverPromise
.then(function() {
return promiseClient()
})
.then(function(receiver) {
var receivePromise = promiseReceipt(receiver); //initialised before publication so no events are missed
return Promise.resolve()
.then(function(){
return promiseClient();
})
.then(function(sender){
return promisePublication(sender,testTopic,testMessage);
})
.then(function(){ //move this clause to first in the list, and Mosca delivers the message!
return promiseSubscription(receiver, testTopic);
})
.then(function(){
return receivePromise; //promise initialised earlier is passed back
});
});
testPromise.then(function(receiveResult){
console.log("Message Received: " + JSON.stringify(receiveResult));
});
@cefn
Copy link
Author

cefn commented Aug 9, 2015

The code above is demonstrated to work where there is a localhost Mosquitto broker (configured with Websocket support) on port 3000.

It outputs...

Message Received: ["Hello","World"]

However, if you shut down the Mosquitto websockets server on 3000, and change the serverPromise to be promiseMosca() instead of promiseMosquitto() then the test case fails.

@cefn
Copy link
Author

cefn commented Aug 9, 2015

I should underline that the test case fails by never completing the callback, (so it appears as a hang), although the original version used Q to force the promise to concretely be rejected after a known timeout, the library dependency on Q has been removed.

@roccomuso
Copy link

Don't you need a persistence to deliver retained message with Mosca?

@cefn
Copy link
Author

cefn commented Nov 23, 2015

Thanks for taking the time to look at this!. Providing a configuration like the one below indeed resolves the issue altogether.

Although I previously had a persistence configuration (and removed it to make a more minimal test case) there must have been something wrong with the configuration I provided. This one is minimal but works...

var moscaOpts = {
        http:{
            port:3000,
            static:false,
            bundle:false
        },
        persistence: {
            factory: mosca.persistence.Memory
        },
        onlyHttp:true
    };

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment