Created
July 29, 2015 09:06
-
-
Save cefn/b53f684848e04fd5bf64 to your computer and use it in GitHub Desktop.
Gist is intended to emulate a networked eventing framework, illustrating a Kefir stream issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
var _ = require("lodash"), | |
Kefir = require("kefir"), | |
events = require("events"); | |
/** This Gist is intended to emulate an eventing framework in which Javascript objects | |
* containing topic:value pairs are kept in synchrony across a network. | |
* | |
* There is a mysterious problem, in which a flatMap actually triggered by a Property event | |
* creates a stream which is also based on the same Property, but which doesn't seem to generate events. | |
* */ | |
var localValues = {}; //the local table mapping topics to values which triggers and receives remote broadcasts | |
var localValueProps = {}; //will contain streams indexed by topic notifying local changes | |
var remoteValueProps = {}; //will contain streams indexed by topic notifying remote changes | |
var localValueEmitter = new events.EventEmitter(); //used to artificially generate 'local' change events | |
var remoteValueEmitter = new events.EventEmitter(); //used to artificially generate 'remote' change events | |
//CONSTRUCT AN EVENT STREAM NOTIFYING NEW TOPICS | |
var systemEmitter = new events.EventEmitter(); | |
var notifyTopicStream = Kefir.fromEvents(systemEmitter, "newTopic"); | |
function notifyTopic(topic){ | |
systemEmitter.emit("newTopic", topic); | |
} | |
/** This is the fundamental operation being monitored and remotely synced. */ | |
function setLocalValue(topic, value){ | |
if( ! (topic in localValues) ){ | |
notifyTopic(topic); | |
} | |
localValues[topic] = value; | |
localValueEmitter.emit(topic, [topic, value]); | |
} | |
/** Changes stream to Property and activates it with lodash _.noop */ | |
function configureProperty(stream){ | |
return stream.toProperty().onValue(_.noop); | |
} | |
/** Lazy creates local property (for values set locally) */ | |
function getLocalValueProp(topic){ | |
if( ! (topic in localValueProps) ){ | |
var localStream = Kefir.fromEvents(localValueEmitter, topic); | |
localValueProps[topic] = configureProperty(localStream); | |
} | |
return localValueProps[topic]; | |
} | |
/** Lazy creates remote property (for values received via broadcast) */ | |
function getRemoteValueProp(topic){ | |
if( ! (topic in remoteValueProps) ){ | |
var remoteStream = Kefir.fromEvents(remoteValueEmitter, topic); | |
var initiallyUndefinedStream = Kefir.concat([Kefir.constant([topic, void(0)]), remoteStream]); //starts with the value pair [topic, undefined] | |
remoteValueProps[topic] = configureProperty(initiallyUndefinedStream); | |
} | |
return remoteValueProps[topic]; | |
} | |
//CREATE A STREAM WHICH NOTIFIES REMOTE UPDATES TO ANY TOPICS HELD LOCALLY | |
var broadcastMonitor = notifyTopicStream.flatMap(function(topic) { | |
return getRemoteValueProp(topic); | |
}); | |
broadcastMonitor.onValue(_.spread(function(topic, value){ | |
if(typeof value !== "undefined"){ | |
setLocalValue(topic,value); | |
} | |
})); | |
/** Streams local topic change events where | |
* - it is a string value | |
* - last received remote value doesn't match (else would create an endless loop of rebroadcast) | |
* ...therefore new value should be broadcast. */ | |
var localChangeStream = notifyTopicStream.flatMap(function(topic){ | |
return getLocalValueProp(topic).take(1); //sample first value DEBUG: this is called with topic="hello" | |
}).filter(_.spread(function(topic, value){ | |
return typeof value === "string"; //test for string value DEBUG: this line is called once with [topic, value] == ["hello", "Earth"] | |
})).flatMap(_.spread(function(topic,value){ | |
//construct a stream which serves only local updates (ignores those which are already mirrored by remote) | |
return getRemoteValueProp(topic).sampledBy(getLocalValueProp(topic), function(){ //DEBUG: this line is called once with [topic, value]==["hello","Earth"] | |
return Array.prototype.slice.call(arguments); //DEBUG: this combination operator and its dependent callbacks below are NEVER CALLED!!! | |
}).map(_.spread(function(remoteEvent, localEvent){ | |
return [remoteEvent[1], localEvent[1]]; //extracts only the value from each [topic,value] event | |
})).filter(_.spread(function(remoteValue, localValue){ | |
return remoteValue !== localValue; //check that local value change wasn't itself the result of an incoming broadcast value | |
})).map(_.spread(function(remoteValue, localValue){ | |
return [topic, localValue]; //normalize the event format ready for broadcast | |
})); | |
})); | |
localChangeStream.onValue(_.spread(function(topic, value){ | |
_.delay(function(){ //notifies a remote event with a delay (emulating network) | |
remoteValueEmitter.emit(topic, [topic,value]); | |
},100); | |
})); | |
/* | |
//skip initial undefined value, and subsequent 'Earth' value, then issue 'Mars' | |
getRemoteValueProp("hello").skip(2).take(1).onValue(function(){ | |
remoteValueEmitter.emit("hello", ["hello", "Mars"]); //should trigger change to local topic, but not be rebroadcast | |
}); | |
*/ | |
setLocalValue("hello", "Earth"); //DEBUG: should trigger broadcast of topic through localChangeStream, but doesn't |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment