Created
September 18, 2015 10:30
-
-
Save cefn/99c69c0ab44b091a9ea6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var _ = require("lodash"); | |
/** A draft approach to substituting Kefir streams with a bespoke library, suitable as a means of replacing the chained filters by withHandler calls. | |
* Benefits: more lightweight, (not cloning a stream for every operation), more debuggable (inspectable through the stack). | |
* */ | |
function Listenable(){ | |
this.listeners = {}; | |
this.active = false; | |
} | |
Listenable.prototype = { | |
activate:function(){ | |
this.active = true; | |
}, | |
deactivate:function(){ | |
this.active = false; | |
}, | |
traverseListeners:function(type){ | |
var list = this.listeners[type]; | |
if(arguments.length === 1){ | |
for(idx=0;idx<list.length;idx++){ | |
list[idx].call(null); | |
} | |
} | |
else{ | |
var args = Array.prototype.slice.apply(arguments, [1]); | |
for(idx=0;idx<list.length;idx++){ | |
list[idx].apply(null, args); | |
} | |
} | |
}, | |
addListener:function(type, listener){ | |
if(! (type in this.listeners)){ | |
this.listeners[type]=[]; | |
} | |
this.listeners[type].push(listener); | |
if(!this.active){ | |
this.activate(); | |
} | |
}, | |
removeListener:function(type,listener){ | |
if(type in this.listeners){ | |
var list = this.listeners[type]; | |
var length = list.length; | |
_.pull(list, listener); | |
if(list.length != length){ //item was removed | |
if(list.length === 0){ //no more listeners of this type | |
delete this.listeners[type]; | |
if(Object.keys(this.listeners).length===0){ //no more listeners of any type | |
this.deactivate(); | |
} | |
} | |
return true; | |
} | |
} | |
return false; | |
} | |
}; | |
function Emitter(){ | |
Listenable.apply(this,arguments); | |
} | |
Emitter.prototype = _.create(Listenable.prototype, { | |
emit:function(value){ | |
if( "value" in this.listeners){ | |
this.traverseListeners("value",value); | |
} | |
if( "event" in this.listeners){ | |
this.traverseListeners("event",{type:"value",value:value}); | |
} | |
}, | |
error:function(error){ | |
if( "error" in this.listeners){ | |
this.traverseListeners("error",error); | |
} | |
if( "event" in this.listeners){ | |
this.traverseListeners("event",{type:"error",value:error}); | |
} | |
}, | |
end:function(){ | |
if( "end" in this.listeners){ | |
this.traverseListeners("end"); | |
} | |
if( "event" in this.listeners){ | |
this.traverseListeners("event",{type:"end"}); | |
} | |
}, | |
emitEvent:function(event){ | |
if( "event" in this.listeners){ | |
this.traverseListeners("event",event); | |
} | |
switch(event.type){ | |
case "value": | |
if("value" in this.listeners){ | |
this.traverseListeners("value",event.value); | |
} | |
break; | |
case "error": | |
if("error" in this.listeners){ | |
this.traverseListeners("error",event.value); | |
} | |
break; | |
case "end": | |
if("end" in this.listeners){ | |
this.traverseListeners("end"); | |
} | |
} | |
} | |
}); | |
function Pipe(emitter){ | |
this.handlers = []; | |
this.handlerIdx=0; | |
this.emitter = emitter; | |
} | |
Pipe.prototype = { | |
addHandler:function(handler){ | |
this.handlers.push(handler); | |
return handler; | |
}, | |
unshiftHandler:function(handler){ | |
this.handlers.unshift(handler); | |
return handler; | |
}, | |
removeHandler:function(handler){ | |
_.pull(this.handlers,handler); | |
return handler; | |
}, | |
emitEvent:function(event){ | |
if(this.handlerIdx < this.handlers.length){ | |
this.handlers[this.handlerIdx++](event, this); //increments handler entering stack layer | |
this.handlerIdx--; //decrements exiting stack layer | |
} | |
else{ //overran the handler list, so notify external listeners | |
this.emitter.emitEvent(event); | |
} | |
}, | |
emit:function(value){ | |
this.emitEvent({type:"value",value:value}); | |
}, | |
error:function(error){ | |
this.emitEvent({type:"error",value:error}); | |
}, | |
end:function(){ | |
this.emitEvent({type:"end"}); | |
} | |
} | |
function Stream(){ | |
this.out = new Emitter(); | |
this.in = new Pipe(this.out); | |
var oldEnd = this.out.end; | |
this.out.end = function(){ | |
oldEnd.apply(this, arguments); | |
if(this.isactive){ | |
this.deactivate(); | |
} | |
} | |
var oldEmitEvent = this.out.emitEvent; | |
this.out.emitEvent = function(event){ | |
oldEmitEvent.apply(this, arguments); | |
if(event.type==="end" && this.isactive){ | |
this.deactivate(); | |
} | |
} | |
} | |
Stream.prototype = { | |
onValue:function(subscriber){ | |
this.out.addListener("value",subscriber); | |
}, | |
onError:function(subscriber){ | |
this.out.addListener("error",subscriber); | |
}, | |
onEnd:function(subscriber){ | |
this.out.addListener("end",subscriber); | |
}, | |
onAny:function(subscriber){ | |
this.out.addListener("event",subscriber); | |
}, | |
offValue:function(subscriber){ | |
this.out.removeListener("value",subscriber); | |
}, | |
offError:function(subscriber){ | |
this.out.removeListener("error",subscriber); | |
}, | |
offEnd:function(subscriber){ | |
this.out.removeListener("end",subscriber); | |
}, | |
offAny:function(subscriber){ | |
this.out.removeListener("event",subscriber); | |
}, | |
branch:function(){ | |
var thisStream = this; | |
var thatStream = new Stream(); | |
function handleEvent(event){ | |
thatStream.in.emitEvent(event); | |
} | |
var oldActivate = thatStream.out.activate; | |
thatStream.out.activate = function(){ | |
oldActivate.apply(this, arguments); | |
thisStream.onAny(handleEvent); | |
}; | |
var oldDeactivate = thatStream.out.deactivate; | |
thatStream.out.deactivate = function(){ | |
thisStream.offAny(handleEvent); | |
oldDeactivate.apply(this, arguments); | |
}; | |
return thatStream; | |
}, | |
cache:function(){ | |
if(! ("cachedEvent" in this)){ | |
this.cachedEvent = null; | |
this.in.unshiftHandler(function(event, emitter){ | |
if(event.type==="value"){ | |
cachedEvent = event; | |
} | |
}); | |
this.onValue = function(subscriber){ | |
Stream.prototype.onValue.apply(this, subscriber); | |
subscriber(cachedEvent.value); | |
}; | |
this.onAny = function(subscriber){ | |
Stream.prototype.onAny.apply(this, subscriber); | |
subscriber(cachedEvent); | |
}; | |
} | |
}, | |
uncache:function(){ | |
if("cachedEvent" in this){ | |
delete this.cachedEvent; | |
//stop intercepting calls | |
this.onValue = Stream.prototype.onValue; | |
this.onAny = Stream.prototype.onAny; | |
} | |
}, | |
filter:function(){ | |
return this.in.addHandler(createFilterHandler.apply(null, arguments)); | |
}, | |
map:function(){ | |
return this.in.addHandler(createMapHandler.apply(null, arguments)); | |
}, | |
take:function(){ | |
return this.in.addHandler(createTakeHandler.apply(null, arguments)); | |
}, | |
flatten:function(){ | |
return this.in.addHandler(flattenHandler); | |
}, | |
log:function(prefix){ | |
if(typeof prefix === "undefined"){ | |
prefix = ""; | |
} | |
else{ | |
prefix = prefix + ": "; | |
} | |
this.onAny(function(value){ | |
console.log(prefix + JSON.stringify(value)); | |
}); | |
}, | |
fps:function(){ | |
var count = 0; | |
var resetTime = Date.now(); | |
var triggerStream = interval(1000); | |
this.onValue(function(){ count++; }); | |
triggerStream.map(function(){ | |
var now = Date.now(); | |
var timePassed = now - resetTime * 0.001; | |
var fps = count/timePassed; | |
count = 0; | |
resetTime = now; | |
return fps; | |
}); | |
return triggerStream; | |
} | |
}; | |
function flattenHandler(event, emitter){ | |
if(event.type==="value"){ | |
event.value.map(function(entry){ | |
emitter.emit(entry); | |
}) | |
} | |
else{ | |
emitter.emitEvent(event); | |
} | |
} | |
function createFilterHandler(fn){ | |
return function(event, emitter){ | |
if(event.type==="value"){ | |
if(fn(event.value)){ | |
emitter.emitEvent(event); | |
} | |
} | |
} | |
} | |
function createMapHandler(fn){ | |
return function(event, emitter){ | |
if(event.type==="value"){ | |
emitter.emit(fn(event.value)); | |
} | |
else{ | |
emitter.emitEvent(event); | |
} | |
} | |
} | |
/* | |
function createDiffHandler(prev){ | |
} | |
function createFlatMapHandler(fn){ | |
} | |
function createLastHandler(){ | |
} | |
function genericStream(fn){ //equivalent to Kefir.stream or Bacon.fromBinder | |
} | |
function samplingStream(sampledStream, triggerStream){ | |
} | |
*/ | |
function createTakeHandler(count){ | |
function handler(event,emitter){ | |
emitter.emitEvent(event); | |
if(event.type === "value"){ | |
handler.count++; | |
if(handler.count===count){ | |
emitter.end(); | |
return; | |
} | |
} | |
} | |
handler.count = 0; | |
return handler; | |
} | |
function listenerStream(eventSource, eventType){ //fromEvents | |
var thatStream = new Stream(); | |
function sourceHandler(){ | |
thatStream.in.emit(Array.prototype.slice.apply(arguments)); | |
}; | |
var oldActivate = thatStream.out.activate; | |
thatStream.out.activate = function(){ | |
oldActivate.apply(this, arguments); | |
eventSource.on(eventType, sourceHandler); | |
}; | |
var oldDeactivate= thatStream.out.deactivate; | |
thatStream.out.deactivate = function(){ | |
eventSource.off(eventType, sourceHandler); | |
oldDeactivate.apply(this, arguments); | |
}; | |
} | |
function sample(sourceStream,triggerStream, fn){ | |
if(typeof fn === "undefined"){ | |
fn = null; | |
} | |
var thatStream = new Stream(); | |
var cachedSourceEvent = null; | |
function anySourceHandler(sourceEvent){ | |
if(sourceEvent.type==="value"){ | |
cachedSourceEvent = sourceEvent; | |
} | |
else if(sourceEvent.type==="end") { | |
thatStream.in.emitEvent(sourceEvent); | |
} | |
} | |
function anyTriggerHandler(triggerEvent){ | |
if(triggerEvent.type==="value" && cachedSourceEvent !== null){ | |
if(fn===null){ | |
thatStream.in.emitEvent(cachedSourceEvent); | |
} | |
else{ | |
thatStream.in.emit(fn(cachedSourceEvent.value,triggerEvent.value)); | |
} | |
} | |
else if(triggerEvent.type==="end") { | |
thatStream.in.emitEvent(triggerEvent); | |
} | |
} | |
var oldActivate = thatStream.out.activate; | |
thatStream.out.activate = function(){ | |
oldActivate.apply(this, arguments); | |
sourceStream.onAny(anySourceHandler); | |
triggerStream.onAny(anyTriggerHandler); | |
}; | |
var oldDeactivate= thatStream.out.deactivate; | |
thatStream.out.deactivate = function(){ | |
sourceStream.offAny(anySourceHandler); | |
triggerStream.offAny(anyTriggerHandler); | |
oldDeactivate.apply(this, arguments); | |
}; | |
return thatStream; | |
} | |
function concat(streams){ | |
var thatStream = new Stream(); | |
var remaining = Array.prototype.slice.apply(streams); | |
var eventSubscriber; | |
var eventUnsubscriber; | |
var eventHandler; | |
eventSubscriber=function(){ | |
if(remaining.length > 0){ | |
remaining[0].onAny(eventHandler); | |
} | |
else{ | |
thatStream.in.emitEvent({type:"end"}); | |
} | |
}; | |
eventUnsubscriber=function(){ | |
remaining[0].offEvent(eventHandler); | |
}; | |
eventHandler = function(event){ | |
if(event.type==="end"){ | |
remaining.shift(); | |
eventSubscriber(); | |
} | |
else{ | |
thatStream.in.emitEvent(event); | |
} | |
}; | |
var oldActivate = thatStream.out.activate; | |
thatStream.out.activate = function(){ | |
oldActivate.apply(this, arguments); | |
eventSubscriber(); | |
}; | |
var oldDeactivate = thatStream.out.deactivate; | |
thatStream.out.deactivate = function(){ | |
eventUnsubscriber(); | |
oldDeactivate.apply(this, arguments); | |
}; | |
return thatStream; | |
} | |
function mergeStreams(streams){ | |
var thatStream = new Stream(); | |
var eventHandler; | |
eventHandler = function(event){ | |
stream.emitEvent(event); | |
}; | |
var oldActivate = thatStream.out.activate; | |
thatStream.out.activate = function(){ | |
oldActivate.apply(this, arguments); | |
streams.map(function(stream){ | |
stream.onEvent(eventHandler); | |
}); | |
}; | |
var oldDeactivate = thatStream.out.deactivate; | |
thatStream.out.deactivate = function(){ | |
streams.map(function(stream){ | |
stream.offEvent(eventHandler); | |
}); | |
oldDeactivate.apply(this, arguments); | |
}; | |
} | |
function combineStreams(streams, fn){ | |
if(typeof fn === "undefined"){ | |
fn = function(){ | |
return Array.prototype.slice.apply(arguments); | |
}; | |
} | |
var thatStream = new Stream(); | |
var lastValues; | |
var gotValues; | |
var eventHandlers = streams.map(function(stream, idx){ | |
return function(event){ | |
if(event.type==="value"){ | |
lastValues[idx]=value; | |
if(gotValues !== null){ | |
if(!gotValues[idx]){ | |
gotValues[idx]=true; | |
if(_.every(gotValues)){ | |
gotValues = null; | |
} | |
} | |
} | |
if(gotValues === null){ | |
thatStream.emit(fn.apply(null,lastValues)); | |
} | |
} | |
} | |
}); | |
var oldActivate = thatStream.out.activate; | |
thatStream.out.activate = function(){ | |
oldActivate.apply(this, arguments); | |
streams.map(function(stream, idx){ | |
lastValues = new Array(streams.length); | |
gotValues = new Array(streams.length); | |
_.fill(gotValues,false) | |
stream.onValue(eventHandlers[idx]); | |
}); | |
}; | |
var oldDeactivate = thatStream.out.deactivate; | |
thatStream.out.deactivate = function(){ | |
streams.map(function(stream, idx){ | |
stream.offValue(eventHandlers[idx]); | |
}); | |
oldDeactivate.apply(this, arguments); | |
}; | |
return thatStream; | |
} | |
function promiseLast(stream){ | |
var eventHandler; | |
var cached=null; | |
var promise = new Promise(function(resolve, reject){ | |
eventHandler=function(event){ | |
switch(event.type){ | |
case "value": | |
cached = event; | |
break; | |
case "error": | |
cached = event; | |
break; | |
case "end": | |
if(cached !==null){ | |
if(cached.type==="value"){ | |
resolve(cached.value); | |
} | |
else{ | |
reject(cached.value); | |
} | |
} | |
break; | |
} | |
}; | |
stream.onAny(eventHandler); | |
}); | |
promise.eventHandler = eventHandler; | |
return promise; | |
}; | |
function once(value){ | |
var thatStream = new Stream(); | |
thatStream.onValue = function(subscriber){ | |
thatStream.out.addListener("value",subscriber); | |
thatStream.in.emit(value); | |
thatStream.out.removeListener("value",subscriber); | |
} | |
thatStream.onAny = function(subscriber){ | |
thatStream.out.addListener("event",subscriber); | |
thatStream.in.emit(value); | |
thatStream.in.end(); | |
thatStream.out.removeListener("event",subscriber); | |
} | |
return thatStream; | |
} | |
function interval(period, value){ | |
var thatStream = new Stream(); | |
var intervalId=null; | |
function tick(){ | |
thatStream.in.emit(value); | |
} | |
var oldActivate = thatStream.out.activate; | |
thatStream.out.activate = function(){ | |
oldActivate.apply(this, arguments); | |
intervalId=setInterval(tick, period); | |
}; | |
var oldDeactivate = thatStream.out.deactivate; | |
thatStream.out.deactivate = function(){ | |
clearInterval(intervalId); | |
intervalId=null; | |
oldDeactivate.apply(this, arguments); | |
}; | |
return thatStream; | |
} | |
function sequence(arr){ | |
var thatStream = new Stream(); | |
thatStream.onValue = function(subscriber){ | |
thatStream.out.addListener("value",subscriber); | |
arr.map(function(entry){ | |
thatStream.in.emit(entry); | |
}); | |
thatStream.out.removeListener("value",subscriber); | |
} | |
thatStream.onAny = function(subscriber){ | |
thatStream.out.addListener("event",subscriber); | |
arr.map(function(entry){ | |
thatStream.in.emit(entry); | |
}); | |
thatStream.in.end(); | |
thatStream.out.removeListener("event",subscriber); | |
} | |
return thatStream; | |
} | |
exports.Stream = Stream; | |
exports.once = once; | |
exports.sequence = sequence; | |
exports.interval = interval; | |
exports.concat = concat; | |
exports.sample = sample; | |
exports.promiseLast = promiseLast; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment