Skip to content

Instantly share code, notes, and snippets.

@cefn
Created July 29, 2015 09:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cefn/b53f684848e04fd5bf64 to your computer and use it in GitHub Desktop.
Save cefn/b53f684848e04fd5bf64 to your computer and use it in GitHub Desktop.
Gist is intended to emulate a networked eventing framework, illustrating a Kefir stream issue
"use strict";
var _ = require("lodash"),
Kefir = require("kefir"),
events = require("events");
/** This Gist is intended to emulate an eventing framework in which Javascript objects
* containing topic:value pairs are kept in synchrony across a network.
*
* There is a mysterious problem, in which a flatMap actually triggered by a Property event
* creates a stream which is also based on the same Property, but which doesn't seem to generate events.
* */
var localValues = {}; //the local table mapping topics to values which triggers and receives remote broadcasts
var localValueProps = {}; //will contain streams indexed by topic notifying local changes
var remoteValueProps = {}; //will contain streams indexed by topic notifying remote changes
var localValueEmitter = new events.EventEmitter(); //used to artificially generate 'local' change events
var remoteValueEmitter = new events.EventEmitter(); //used to artificially generate 'remote' change events
//CONSTRUCT AN EVENT STREAM NOTIFYING NEW TOPICS
var systemEmitter = new events.EventEmitter();
var notifyTopicStream = Kefir.fromEvents(systemEmitter, "newTopic");
function notifyTopic(topic){
systemEmitter.emit("newTopic", topic);
}
/** This is the fundamental operation being monitored and remotely synced. */
function setLocalValue(topic, value){
if( ! (topic in localValues) ){
notifyTopic(topic);
}
localValues[topic] = value;
localValueEmitter.emit(topic, [topic, value]);
}
/** Changes stream to Property and activates it with lodash _.noop */
function configureProperty(stream){
return stream.toProperty().onValue(_.noop);
}
/** Lazy creates local property (for values set locally) */
function getLocalValueProp(topic){
if( ! (topic in localValueProps) ){
var localStream = Kefir.fromEvents(localValueEmitter, topic);
localValueProps[topic] = configureProperty(localStream);
}
return localValueProps[topic];
}
/** Lazy creates remote property (for values received via broadcast) */
function getRemoteValueProp(topic){
if( ! (topic in remoteValueProps) ){
var remoteStream = Kefir.fromEvents(remoteValueEmitter, topic);
var initiallyUndefinedStream = Kefir.concat([Kefir.constant([topic, void(0)]), remoteStream]); //starts with the value pair [topic, undefined]
remoteValueProps[topic] = configureProperty(initiallyUndefinedStream);
}
return remoteValueProps[topic];
}
//CREATE A STREAM WHICH NOTIFIES REMOTE UPDATES TO ANY TOPICS HELD LOCALLY
var broadcastMonitor = notifyTopicStream.flatMap(function(topic) {
return getRemoteValueProp(topic);
});
broadcastMonitor.onValue(_.spread(function(topic, value){
if(typeof value !== "undefined"){
setLocalValue(topic,value);
}
}));
/** Streams local topic change events where
* - it is a string value
* - last received remote value doesn't match (else would create an endless loop of rebroadcast)
* ...therefore new value should be broadcast. */
var localChangeStream = notifyTopicStream.flatMap(function(topic){
return getLocalValueProp(topic).take(1); //sample first value DEBUG: this is called with topic="hello"
}).filter(_.spread(function(topic, value){
return typeof value === "string"; //test for string value DEBUG: this line is called once with [topic, value] == ["hello", "Earth"]
})).flatMap(_.spread(function(topic,value){
//construct a stream which serves only local updates (ignores those which are already mirrored by remote)
return getRemoteValueProp(topic).sampledBy(getLocalValueProp(topic), function(){ //DEBUG: this line is called once with [topic, value]==["hello","Earth"]
return Array.prototype.slice.call(arguments); //DEBUG: this combination operator and its dependent callbacks below are NEVER CALLED!!!
}).map(_.spread(function(remoteEvent, localEvent){
return [remoteEvent[1], localEvent[1]]; //extracts only the value from each [topic,value] event
})).filter(_.spread(function(remoteValue, localValue){
return remoteValue !== localValue; //check that local value change wasn't itself the result of an incoming broadcast value
})).map(_.spread(function(remoteValue, localValue){
return [topic, localValue]; //normalize the event format ready for broadcast
}));
}));
localChangeStream.onValue(_.spread(function(topic, value){
_.delay(function(){ //notifies a remote event with a delay (emulating network)
remoteValueEmitter.emit(topic, [topic,value]);
},100);
}));
/*
//skip initial undefined value, and subsequent 'Earth' value, then issue 'Mars'
getRemoteValueProp("hello").skip(2).take(1).onValue(function(){
remoteValueEmitter.emit("hello", ["hello", "Mars"]); //should trigger change to local topic, but not be rebroadcast
});
*/
setLocalValue("hello", "Earth"); //DEBUG: should trigger broadcast of topic through localChangeStream, but doesn't
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment