Skip to content

Instantly share code, notes, and snippets.



Last active Aug 29, 2015
What would you like to do?
RxJS zip to test networked event sequences
/** Verifies that the over-the-wire MQTT messages arising from IdeaTree operations are as expected. */
describe("Tree Protocol", function(){
it("Sends branch keys then leaf values", function(){
var sniffer = mqtt.connect(conf.mqttWsAddr);
var subscribeAndSendPromise = util.createSubscriptionPromise(sniffer, '#')
writer.setItem('oldmacdonald/farm', {
var packetExtractor = function(topic, payload, packet){
packet.message = payload.toString();
return packet;
var actualStream = Rx.Observable.fromEvent(sniffer, "message", function(args){
return packetExtractor.apply(null, args);
var targetArray = [
["the_workshop/oldmacdonald", "[]" ] , //originally empty
["the_workshop/oldmacdonald/farm", JSON.stringify(['sheep','pig','cow','duck']) ] , //n.b. JSON array
["the_workshop/oldmacdonald/farm" + "/sheep", JSON.stringify("Baa")] , //n.b. quoted JSON strings
["the_workshop/oldmacdonald/farm" + "/pig", JSON.stringify("Oink")] ,
["the_workshop/oldmacdonald/farm" + "/cow", JSON.stringify("Moo")] ,
["the_workshop/oldmacdonald/farm" + "/duck", JSON.stringify("Quack")] ,
var targetStream = Rx.Observable.fromArray(targetArray);
var deepEqual = function(targetValues, actualPacket){
var actualTopic = actualPacket.topic;
var actualMessage = actualPacket.message;
var targetTopic = targetValues[0];
var targetMessage = targetValues[1];
assert(actualTopic == targetTopic);
assert(actualMessage == targetMessage);
return true;
var zippedStream =, actualStream, deepEqual);
var receivePromise = zippedStream.toPromise();
return Q.all([subscribeAndSendPromise, receivePromise]);

This comment has been minimized.

Copy link
Owner Author

@cefn cefn commented Jun 23, 2015

Struggling to understand why the receivePromise never seems to complete. Looks like zippedStream doesn't terminate when the targetStream terminates, and I don't know how to force this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment