Skip to content

Instantly share code, notes, and snippets.

@yelouafi yelouafi/Stream.js
Created Apr 24, 2015

Embed
What would you like to do?
function toArray(args) {
return Array.prototype.slice.call(args);
}
function doLater(act) {
setTimeout(act, 0);
}
function eachKey(obj, f) {
for(var key in obj) {
if( obj.hasOwnProperty(key) )
f(key, obj[key]);
}
}
function adtcase (base, proto, key) {
return (...args) => {
var inst = new base();
eachKey(proto(...args), (key, val) => { inst[key] = val })
inst['is'+key] = true;
return inst;
}
}
function adt(base, variants) {
eachKey(variants, (key, v) => {
if(typeof v === 'function')
base[key] = adtcase(base, v, key);
else {
base[key] = v;
v['is'+key] = true;
}
})
}
function Future() {
this.slots = [];
}
Future.prototype.ready = function(slot) {
if(this.completed)
slot(this.value);
else
this.slots.push(slot);
}
Future.prototype.complete = function(val) {
var me = this;
if( this.completed )
throw "Can't change the outcome of a future!";
this.completed = true;
this.value = val;
for(var i=0, len=me.slots.length; i<len; i++) {
this.slots[i](val);
}
doLater(function() {
this.slots = null;
});
}
Future.prototype.map = function(fn) {
var fut = new Future();
this.ready(function(val) {
fut.complete( fn(val) );
});
return fut;
}
Future.prototype.flatten = function() {
var fut = new Future();
this.ready(function(res) {
if(res instanceof Error)
fut.complete(res);
else {
res.ready( function(val){
fut.complete(val);
});
}
});
return fut;
}
Future.prototype.flatMap = function( fn ) {
return this.map(fn).flatten();
}
Future.lift = function(fn) {
return function() {
var args = toArray(arguments),
ctx = this;
return bindArg(0, []);
function bindArg(index, actArgs) {
return args[index].flatMap(function(val) {
actArgs = actArgs.concat(val);
return (index < args.length - 1) ?
bindArg(index+1, actArgs) :
Future.unit( fn.apply(ctx, actArgs) );
});
}
}
}
Future.prototype.do = function(task) {
return this.flatMap(function(v) {
return Future.task(function(){
return task(v);
});
});
}
Future.never = function() {
return new Future();
}
Future.unit = function(val) {
var fut = new Future();
fut.complete(val);
return fut;
}
Future.delay = function(v, ms) {
var f = new Future();
setTimeout(function(){
f.complete(v);
}, ms);
return f;
}
Future.task = function(task) {
var fut = new Future();
doLater(function(){
fut.complete( task() );
});
return fut;
}
function Stream() {}
adt( Stream, {
Empty : new Stream(),
Cons : (head, tail) => ({head, tail}),
Future : future => ({future})
})
Stream.one = x => Stream.Cons(x, Stream.Empty);
// map : (Stream a, a -> b ) -> Stream b
Stream.prototype.map = function(f) {
return this.isEmpty ? Stream.Empty :
this.isCons ? Stream.Cons( f(this.head), this.tail.map(f) ) :
/* isFuture */ Stream.Future( this.future.map( s => s.map(f) ) );
}
// concat : (Stream a, Stream a ) -> Stream a
Stream.prototype.concat = function(Stream) {
return this.isEmpty ? Stream.Empty :
this.isCons ? Stream.Cons( this.head, this.tail.concat(Stream) ) :
/* isFuture */ Stream.Future( this.future.map( s => s.concat(Stream) ) );
}
// filter : (Stream a, a -> Boolean ) -> Stream a
Stream.prototype.filter = function(f) {
return this.isEmpty ? Stream.Empty :
this.isCons ?
(f(this.head) ?
Stream.Cons( this.head, this.tail.filter(f) ) :
this.tail.filter(f)) :
/* isFuture */ Stream.Future( this.future.map( s => s.filter(f) ) );
}
// sort : Stream a -> Stream a
Stream.prototype.sort = function() {
return this.isEmpty ? Stream.Empty :
this.isCons ?
Stream.Lazy( () => this.tail.filter( x => x < this.head ).sort() )
.concat( Stream.one(this.head) )
.concat( Stream.Lazy(() => this.tail.filter(x => x >= this.head)).sort() ) :
/* isFuture */ Stream.Future( this.future.map( s => s.sort() ) );
}
// take : (Stream a, Integer ) -> Stream a
Stream.prototype.take = function(n) {
return this.isEmpty || n <= 0 ?
Stream.Empty :
this.isCons ?
Stream.Cons( this.head, this.tail.take(n-1) ) :
/* isFuture */
Stream.Future( this.future.map( s => s.take(n) ) );
}
// delay : (Stream a, Number ) -> Stream a
Stream.prototype.delay = function(millis) {
return this.isEmpty ? Stream.Empty :
this.isCons ? Stream.Future( Future.delay(this, millis) ) :
/* isFuture */ Stream.Future( this.future.flatMap( s => Future.delay(s, millis) ) );
}
// do : (Stream a, a -> b ) -> Stream b
Stream.prototype.do = function(action) {
return this.isEmpty ? Stream.Empty :
this.isCons ? Stream.Cons( action(this.head), this.tail.do(action) ) :
/* isFuture */ Stream.Future( this.future.map( s => s.do(action) ) );
}
Stream.impl = function() {
// a Future to hold the next tail
var nextSF = new Future(),
stream = Stream.Future(nextSF);
stream.push = function(data) {
var curSF = nextSF,
head = data,
tail = new Future();
nextSF = tail;
curSF.complete( Stream.Cons( head, Stream.Future(nextSF) ) );
};
stream.end = () => nextSF.complete(Stream.Empty);
return stream ;
}
Stream.array = function(array) {
return from(0);
function from(index){
return index < array.length ?
Stream.Cons( array[index], from(index+1) ) :
Stream.Empty;
}
}
Stream.repeat = function(v, interval, count) {
var s = Stream.impl();
var iv = setInterval(function() {
if(count)
s.push(v)
else {
clearInterval(iv);
s.end(v);
}
count--;
}, interval);
return s;
}
function log(val) {
console.log(val);
return val;
}
Stream.array(['hello', 'world']).delay(2000).do(log)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.