Skip to content

Instantly share code, notes, and snippets.

@yelouafi
Created June 12, 2015 00:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yelouafi/9b596653f68aa9788758 to your computer and use it in GitHub Desktop.
Save yelouafi/9b596653f68aa9788758 to your computer and use it in GitHub Desktop.
"use strict";
class Emitter {
constructor() {
this.slots = [],
this.ends = [];
}
onValue(h) {
this.slots.push(h);
}
onEnd(h) {
this.ends.push(h);
}
emit(v) {
for(var i=0; i<this.slots.length; i++)
this.slots[i](v);
}
end() {
for(var i=0; i<this.slots.length; i++)
this.ends[i]();
}
}
function Obs(on, end) {
this.on = on;
this.end = end;
}
Obs.prototype.map = function(f) {
var me = this;
return new Obs(function(h) {
me.on(function(v) {
h(f(v));
})
}, this.end)
}
Obs.prototype.scan = function(f, acc) {
var me = this;
var first;
return new Obs(function(h) {
if(!first) first = h;
me.on(function(v) {
if(h === first) acc = f(acc, v);
h(acc);
})
}, this.end)
}
Obs.prototype.flatMap = function(f) {
var me = this, em = new Emitter(), count = 1;
me.on(function(v) {
var nestedObs = f(v);
nestedObs.on(function(v) {
em.emit(v);
});
nestedObs.end(onEnd)
})
me.end(function() {
onEnd();
});
return new Obs(em.onValue.bind(em), em.onEnd.bind(em));
function onEnd() {
if(!--count) em.end();
}
}
Obs.prototype.zipWith = function(o2, f) {
var me = this,
em = new Emitter(),
buffer1 = [], buffer2 = [],
end = em.end.bind(em);
me.on(function(v) {
buffer1.push(v);
tryEmit()
})
o2.on(function(v) {
buffer2.push(v);
tryEmit()
})
me.end(end);
o2.end(end);
return new Obs(em.onValue.bind(em), em.onEnd.bind(em));
function tryEmit() {
while(buffer1.length && buffer2.length)
em.emit(f(buffer1.shift(), buffer2.shift()))
}
}
Obs.prototype.reduce = function(f, acc) {
var me = this, em = new Emitter();
me.on(function(v) {
acc = f(acc, v);
})
me.end(function() {
em.emit(acc);
em.end();
});
return new Obs(em.onValue.bind(em), em.onEnd.bind(em));
}
Obs.array = function(arr) {
var em = new Emitter();
setImmediate(function() {
for(var i=0; i<arr.length; i++)
em.emit(arr[i]);
em.end();
})
return new Obs(em.onValue.bind(em), em.onEnd.bind(em));
}
Obs.range = function(min, max) {
var em = new Emitter();
setImmediate(function() {
for(var i=min; i<=max; i++)
em.emit(i);
em.end();
})
return new Obs(em.onValue.bind(em), em.onEnd.bind(em));
}
Obs.prototype.log = function(pref) {
this.on(function(v) {
console.log(pref, v);
})
this.end(function() {
console.log(pref, 'end');
})
return this;
}
/*
function dbl(v) {
return v * 2;
}
function sum(x,y) {
return x+y;
}
var arr = [1,2,3,4]
var arr2 = [1,2,3,4]
Obs.array(arr).zipWith(Obs.array(arr2), sum).log('')
*/
module.exports = Obs;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment