Skip to content

Instantly share code, notes, and snippets.

@cefn
Created September 18, 2015 10:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cefn/99c69c0ab44b091a9ea6 to your computer and use it in GitHub Desktop.
Save cefn/99c69c0ab44b091a9ea6 to your computer and use it in GitHub Desktop.
var _ = require("lodash");
/** A draft approach to substituting Kefir streams with a bespoke library, suitable as a means of replacing the chained filters by withHandler calls.
* Benefits: more lightweight, (not cloning a stream for every operation), more debuggable (inspectable through the stack).
* */
function Listenable(){
this.listeners = {};
this.active = false;
}
Listenable.prototype = {
activate:function(){
this.active = true;
},
deactivate:function(){
this.active = false;
},
traverseListeners:function(type){
var list = this.listeners[type];
if(arguments.length === 1){
for(idx=0;idx<list.length;idx++){
list[idx].call(null);
}
}
else{
var args = Array.prototype.slice.apply(arguments, [1]);
for(idx=0;idx<list.length;idx++){
list[idx].apply(null, args);
}
}
},
addListener:function(type, listener){
if(! (type in this.listeners)){
this.listeners[type]=[];
}
this.listeners[type].push(listener);
if(!this.active){
this.activate();
}
},
removeListener:function(type,listener){
if(type in this.listeners){
var list = this.listeners[type];
var length = list.length;
_.pull(list, listener);
if(list.length != length){ //item was removed
if(list.length === 0){ //no more listeners of this type
delete this.listeners[type];
if(Object.keys(this.listeners).length===0){ //no more listeners of any type
this.deactivate();
}
}
return true;
}
}
return false;
}
};
function Emitter(){
Listenable.apply(this,arguments);
}
Emitter.prototype = _.create(Listenable.prototype, {
emit:function(value){
if( "value" in this.listeners){
this.traverseListeners("value",value);
}
if( "event" in this.listeners){
this.traverseListeners("event",{type:"value",value:value});
}
},
error:function(error){
if( "error" in this.listeners){
this.traverseListeners("error",error);
}
if( "event" in this.listeners){
this.traverseListeners("event",{type:"error",value:error});
}
},
end:function(){
if( "end" in this.listeners){
this.traverseListeners("end");
}
if( "event" in this.listeners){
this.traverseListeners("event",{type:"end"});
}
},
emitEvent:function(event){
if( "event" in this.listeners){
this.traverseListeners("event",event);
}
switch(event.type){
case "value":
if("value" in this.listeners){
this.traverseListeners("value",event.value);
}
break;
case "error":
if("error" in this.listeners){
this.traverseListeners("error",event.value);
}
break;
case "end":
if("end" in this.listeners){
this.traverseListeners("end");
}
}
}
});
function Pipe(emitter){
this.handlers = [];
this.handlerIdx=0;
this.emitter = emitter;
}
Pipe.prototype = {
addHandler:function(handler){
this.handlers.push(handler);
return handler;
},
unshiftHandler:function(handler){
this.handlers.unshift(handler);
return handler;
},
removeHandler:function(handler){
_.pull(this.handlers,handler);
return handler;
},
emitEvent:function(event){
if(this.handlerIdx < this.handlers.length){
this.handlers[this.handlerIdx++](event, this); //increments handler entering stack layer
this.handlerIdx--; //decrements exiting stack layer
}
else{ //overran the handler list, so notify external listeners
this.emitter.emitEvent(event);
}
},
emit:function(value){
this.emitEvent({type:"value",value:value});
},
error:function(error){
this.emitEvent({type:"error",value:error});
},
end:function(){
this.emitEvent({type:"end"});
}
}
function Stream(){
this.out = new Emitter();
this.in = new Pipe(this.out);
var oldEnd = this.out.end;
this.out.end = function(){
oldEnd.apply(this, arguments);
if(this.isactive){
this.deactivate();
}
}
var oldEmitEvent = this.out.emitEvent;
this.out.emitEvent = function(event){
oldEmitEvent.apply(this, arguments);
if(event.type==="end" && this.isactive){
this.deactivate();
}
}
}
Stream.prototype = {
onValue:function(subscriber){
this.out.addListener("value",subscriber);
},
onError:function(subscriber){
this.out.addListener("error",subscriber);
},
onEnd:function(subscriber){
this.out.addListener("end",subscriber);
},
onAny:function(subscriber){
this.out.addListener("event",subscriber);
},
offValue:function(subscriber){
this.out.removeListener("value",subscriber);
},
offError:function(subscriber){
this.out.removeListener("error",subscriber);
},
offEnd:function(subscriber){
this.out.removeListener("end",subscriber);
},
offAny:function(subscriber){
this.out.removeListener("event",subscriber);
},
branch:function(){
var thisStream = this;
var thatStream = new Stream();
function handleEvent(event){
thatStream.in.emitEvent(event);
}
var oldActivate = thatStream.out.activate;
thatStream.out.activate = function(){
oldActivate.apply(this, arguments);
thisStream.onAny(handleEvent);
};
var oldDeactivate = thatStream.out.deactivate;
thatStream.out.deactivate = function(){
thisStream.offAny(handleEvent);
oldDeactivate.apply(this, arguments);
};
return thatStream;
},
cache:function(){
if(! ("cachedEvent" in this)){
this.cachedEvent = null;
this.in.unshiftHandler(function(event, emitter){
if(event.type==="value"){
cachedEvent = event;
}
});
this.onValue = function(subscriber){
Stream.prototype.onValue.apply(this, subscriber);
subscriber(cachedEvent.value);
};
this.onAny = function(subscriber){
Stream.prototype.onAny.apply(this, subscriber);
subscriber(cachedEvent);
};
}
},
uncache:function(){
if("cachedEvent" in this){
delete this.cachedEvent;
//stop intercepting calls
this.onValue = Stream.prototype.onValue;
this.onAny = Stream.prototype.onAny;
}
},
filter:function(){
return this.in.addHandler(createFilterHandler.apply(null, arguments));
},
map:function(){
return this.in.addHandler(createMapHandler.apply(null, arguments));
},
take:function(){
return this.in.addHandler(createTakeHandler.apply(null, arguments));
},
flatten:function(){
return this.in.addHandler(flattenHandler);
},
log:function(prefix){
if(typeof prefix === "undefined"){
prefix = "";
}
else{
prefix = prefix + ": ";
}
this.onAny(function(value){
console.log(prefix + JSON.stringify(value));
});
},
fps:function(){
var count = 0;
var resetTime = Date.now();
var triggerStream = interval(1000);
this.onValue(function(){ count++; });
triggerStream.map(function(){
var now = Date.now();
var timePassed = now - resetTime * 0.001;
var fps = count/timePassed;
count = 0;
resetTime = now;
return fps;
});
return triggerStream;
}
};
function flattenHandler(event, emitter){
if(event.type==="value"){
event.value.map(function(entry){
emitter.emit(entry);
})
}
else{
emitter.emitEvent(event);
}
}
function createFilterHandler(fn){
return function(event, emitter){
if(event.type==="value"){
if(fn(event.value)){
emitter.emitEvent(event);
}
}
}
}
function createMapHandler(fn){
return function(event, emitter){
if(event.type==="value"){
emitter.emit(fn(event.value));
}
else{
emitter.emitEvent(event);
}
}
}
/*
function createDiffHandler(prev){
}
function createFlatMapHandler(fn){
}
function createLastHandler(){
}
function genericStream(fn){ //equivalent to Kefir.stream or Bacon.fromBinder
}
function samplingStream(sampledStream, triggerStream){
}
*/
function createTakeHandler(count){
function handler(event,emitter){
emitter.emitEvent(event);
if(event.type === "value"){
handler.count++;
if(handler.count===count){
emitter.end();
return;
}
}
}
handler.count = 0;
return handler;
}
function listenerStream(eventSource, eventType){ //fromEvents
var thatStream = new Stream();
function sourceHandler(){
thatStream.in.emit(Array.prototype.slice.apply(arguments));
};
var oldActivate = thatStream.out.activate;
thatStream.out.activate = function(){
oldActivate.apply(this, arguments);
eventSource.on(eventType, sourceHandler);
};
var oldDeactivate= thatStream.out.deactivate;
thatStream.out.deactivate = function(){
eventSource.off(eventType, sourceHandler);
oldDeactivate.apply(this, arguments);
};
}
function sample(sourceStream,triggerStream, fn){
if(typeof fn === "undefined"){
fn = null;
}
var thatStream = new Stream();
var cachedSourceEvent = null;
function anySourceHandler(sourceEvent){
if(sourceEvent.type==="value"){
cachedSourceEvent = sourceEvent;
}
else if(sourceEvent.type==="end") {
thatStream.in.emitEvent(sourceEvent);
}
}
function anyTriggerHandler(triggerEvent){
if(triggerEvent.type==="value" && cachedSourceEvent !== null){
if(fn===null){
thatStream.in.emitEvent(cachedSourceEvent);
}
else{
thatStream.in.emit(fn(cachedSourceEvent.value,triggerEvent.value));
}
}
else if(triggerEvent.type==="end") {
thatStream.in.emitEvent(triggerEvent);
}
}
var oldActivate = thatStream.out.activate;
thatStream.out.activate = function(){
oldActivate.apply(this, arguments);
sourceStream.onAny(anySourceHandler);
triggerStream.onAny(anyTriggerHandler);
};
var oldDeactivate= thatStream.out.deactivate;
thatStream.out.deactivate = function(){
sourceStream.offAny(anySourceHandler);
triggerStream.offAny(anyTriggerHandler);
oldDeactivate.apply(this, arguments);
};
return thatStream;
}
function concat(streams){
var thatStream = new Stream();
var remaining = Array.prototype.slice.apply(streams);
var eventSubscriber;
var eventUnsubscriber;
var eventHandler;
eventSubscriber=function(){
if(remaining.length > 0){
remaining[0].onAny(eventHandler);
}
else{
thatStream.in.emitEvent({type:"end"});
}
};
eventUnsubscriber=function(){
remaining[0].offEvent(eventHandler);
};
eventHandler = function(event){
if(event.type==="end"){
remaining.shift();
eventSubscriber();
}
else{
thatStream.in.emitEvent(event);
}
};
var oldActivate = thatStream.out.activate;
thatStream.out.activate = function(){
oldActivate.apply(this, arguments);
eventSubscriber();
};
var oldDeactivate = thatStream.out.deactivate;
thatStream.out.deactivate = function(){
eventUnsubscriber();
oldDeactivate.apply(this, arguments);
};
return thatStream;
}
function mergeStreams(streams){
var thatStream = new Stream();
var eventHandler;
eventHandler = function(event){
stream.emitEvent(event);
};
var oldActivate = thatStream.out.activate;
thatStream.out.activate = function(){
oldActivate.apply(this, arguments);
streams.map(function(stream){
stream.onEvent(eventHandler);
});
};
var oldDeactivate = thatStream.out.deactivate;
thatStream.out.deactivate = function(){
streams.map(function(stream){
stream.offEvent(eventHandler);
});
oldDeactivate.apply(this, arguments);
};
}
function combineStreams(streams, fn){
if(typeof fn === "undefined"){
fn = function(){
return Array.prototype.slice.apply(arguments);
};
}
var thatStream = new Stream();
var lastValues;
var gotValues;
var eventHandlers = streams.map(function(stream, idx){
return function(event){
if(event.type==="value"){
lastValues[idx]=value;
if(gotValues !== null){
if(!gotValues[idx]){
gotValues[idx]=true;
if(_.every(gotValues)){
gotValues = null;
}
}
}
if(gotValues === null){
thatStream.emit(fn.apply(null,lastValues));
}
}
}
});
var oldActivate = thatStream.out.activate;
thatStream.out.activate = function(){
oldActivate.apply(this, arguments);
streams.map(function(stream, idx){
lastValues = new Array(streams.length);
gotValues = new Array(streams.length);
_.fill(gotValues,false)
stream.onValue(eventHandlers[idx]);
});
};
var oldDeactivate = thatStream.out.deactivate;
thatStream.out.deactivate = function(){
streams.map(function(stream, idx){
stream.offValue(eventHandlers[idx]);
});
oldDeactivate.apply(this, arguments);
};
return thatStream;
}
function promiseLast(stream){
var eventHandler;
var cached=null;
var promise = new Promise(function(resolve, reject){
eventHandler=function(event){
switch(event.type){
case "value":
cached = event;
break;
case "error":
cached = event;
break;
case "end":
if(cached !==null){
if(cached.type==="value"){
resolve(cached.value);
}
else{
reject(cached.value);
}
}
break;
}
};
stream.onAny(eventHandler);
});
promise.eventHandler = eventHandler;
return promise;
};
function once(value){
var thatStream = new Stream();
thatStream.onValue = function(subscriber){
thatStream.out.addListener("value",subscriber);
thatStream.in.emit(value);
thatStream.out.removeListener("value",subscriber);
}
thatStream.onAny = function(subscriber){
thatStream.out.addListener("event",subscriber);
thatStream.in.emit(value);
thatStream.in.end();
thatStream.out.removeListener("event",subscriber);
}
return thatStream;
}
function interval(period, value){
var thatStream = new Stream();
var intervalId=null;
function tick(){
thatStream.in.emit(value);
}
var oldActivate = thatStream.out.activate;
thatStream.out.activate = function(){
oldActivate.apply(this, arguments);
intervalId=setInterval(tick, period);
};
var oldDeactivate = thatStream.out.deactivate;
thatStream.out.deactivate = function(){
clearInterval(intervalId);
intervalId=null;
oldDeactivate.apply(this, arguments);
};
return thatStream;
}
function sequence(arr){
var thatStream = new Stream();
thatStream.onValue = function(subscriber){
thatStream.out.addListener("value",subscriber);
arr.map(function(entry){
thatStream.in.emit(entry);
});
thatStream.out.removeListener("value",subscriber);
}
thatStream.onAny = function(subscriber){
thatStream.out.addListener("event",subscriber);
arr.map(function(entry){
thatStream.in.emit(entry);
});
thatStream.in.end();
thatStream.out.removeListener("event",subscriber);
}
return thatStream;
}
exports.Stream = Stream;
exports.once = once;
exports.sequence = sequence;
exports.interval = interval;
exports.concat = concat;
exports.sample = sample;
exports.promiseLast = promiseLast;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment