Skip to content

Instantly share code, notes, and snippets.

@cefn
Last active August 29, 2015 14:23
Show Gist options
  • Save cefn/e6595f7abf20a58d1d51 to your computer and use it in GitHub Desktop.
Save cefn/e6595f7abf20a58d1d51 to your computer and use it in GitHub Desktop.
RxJS zip to test networked event sequences
/** Verifies that the over-the-wire MQTT messages arising from IdeaTree operations are as expected. */
describe("Tree Protocol", function(){
it("Sends branch keys then leaf values", function(){
var sniffer = mqtt.connect(conf.mqttWsAddr);
var subscribeAndSendPromise = util.createSubscriptionPromise(sniffer, '#')
.then(function(){
writer.setItem('oldmacdonald/farm', {
sheep:"Baa",
pig:"Oink",
cow:"Moo",
duck:"Quack",
});
});
var packetExtractor = function(topic, payload, packet){
packet.message = payload.toString();
return packet;
};
var actualStream = Rx.Observable.fromEvent(sniffer, "message", function(args){
return packetExtractor.apply(null, args);
});
var targetArray = [
["the_workshop/oldmacdonald", "[]" ] , //originally empty
["the_workshop/oldmacdonald/farm", JSON.stringify(['sheep','pig','cow','duck']) ] , //n.b. JSON array
["the_workshop/oldmacdonald/farm" + "/sheep", JSON.stringify("Baa")] , //n.b. quoted JSON strings
["the_workshop/oldmacdonald/farm" + "/pig", JSON.stringify("Oink")] ,
["the_workshop/oldmacdonald/farm" + "/cow", JSON.stringify("Moo")] ,
["the_workshop/oldmacdonald/farm" + "/duck", JSON.stringify("Quack")] ,
];
var targetStream = Rx.Observable.fromArray(targetArray);
var deepEqual = function(targetValues, actualPacket){
var actualTopic = actualPacket.topic;
var actualMessage = actualPacket.message;
var targetTopic = targetValues[0];
var targetMessage = targetValues[1];
assert(actualTopic == targetTopic);
assert(actualMessage == targetMessage);
return true;
};
var zippedStream = Rx.Observable.zip(targetStream, actualStream, deepEqual);
var receivePromise = zippedStream.toPromise();
receivePromise.finally(function(){
sniffer.end();
});
return Q.all([subscribeAndSendPromise, receivePromise]);
})
@cefn
Copy link
Author

cefn commented Jun 23, 2015

Struggling to understand why the receivePromise never seems to complete. Looks like zippedStream doesn't terminate when the targetStream terminates, and I don't know how to force this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment