Skip to content

Instantly share code, notes, and snippets.

@cefn
Created June 23, 2015 13:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cefn/305cfb5d74331d579bd0 to your computer and use it in GitHub Desktop.
Save cefn/305cfb5d74331d579bd0 to your computer and use it in GitHub Desktop.
RxJS zip to test networked event sequences
/** Verifies that the over-the-wire MQTT messages arising from IdeaTree operations are as expected. */
describe("Tree Protocol", function(){
it("Sends branch keys then leaf values", function(){
var sniffer = mqtt.connect(conf.mqttWsAddr);
var subscribeAndSendPromise = util.createSubscriptionPromise(sniffer, '#')
.then(function(){
writer.setItem('oldmacdonald/farm', {
sheep:"Baa",
pig:"Oink",
cow:"Moo",
duck:"Quack",
});
});
var packetExtractor = function(topic, payload, packet){
packet.message = payload.toString();
return packet;
};
var actualStream = Rx.Observable.fromEvent(sniffer, "message", function(args){
return packetExtractor.apply(null, args);
});
var targetArray = [
["the_workshop/oldmacdonald", "[]" ] , //originally empty
// ["the_workshop/oldmacdonald", JSON.stringify(['farm']) , // TODO somehow missing
// ["the_workshop/oldmacdonald/farm", JSON.stringify([]) ] , // TODO somehow missing
["the_workshop/oldmacdonald/farm", JSON.stringify(['sheep','pig','cow','duck']) ] , //n.b. JSON array
["the_workshop/oldmacdonald/farm" + "/sheep", JSON.stringify("Baa")] , //n.b. quoted JSON strings
["the_workshop/oldmacdonald/farm" + "/pig", JSON.stringify("Oink")] ,
["the_workshop/oldmacdonald/farm" + "/cow", JSON.stringify("Moo")] ,
["the_workshop/oldmacdonald/farm" + "/duck", JSON.stringify("Quack")] ,
];
var targetStream = Rx.Observable.fromArray(targetArray);
var deepEqual = function(targetValues, actualPacket){
var actualTopic = actualPacket.topic;
var actualMessage = actualPacket.message;
var targetTopic = targetValues[0];
var targetMessage = targetValues[1];
assert(actualTopic == targetTopic);
assert(actualMessage == targetMessage);
return true;
};
var zippedStream = Rx.Observable.zip(targetStream, actualStream, deepEqual);
var receivePromise = zippedStream.toPromise();
receivePromise.finally(function(){
sniffer.end();
});
return Q.all([subscribeAndSendPromise, receivePromise]);
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment