Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

View yelouafi's full-sized avatar

Yassine Elouafi yelouafi

View GitHub Profile
// any : ( Stream a, a -> aBoolean ) -> Promise Boolean
Stream.prototype.any = function(pred) {
return this.isEmpty ? Promise.resolve(false)
: this.isAbort ? Promise.reject(this.err)
: this.isCons ?
(pred(this.head) ? Promise.resolve(true) : this.tail.any(pred) )
: /* isFuture */
this.promise.then( s => s.any(pred) );
}
// takeUntil : (Stream a, Promise) => Stream a
Stream.prototype.takeUntil = function(untilP) {
return this.isEmpty || this.isAbort ? this
: this.isCons ? Stream.Cons( this.head, this.tail.takeUntil(untilP) )
: Stream.Future(
// can you spot a problem here ?
Promise.race([
untilP.then( _ => Stream.Empty, Stream.Abort ),
this.promise.then( s => s.takeUntil(untilP), Stream.Abort )
Stream.prototype.takeUntil = function(untilP) {
...
// isFuture
Stream.Future(
Promise.race([
untilP.then( _ => () => Stream.Empty, Stream.Abort ),
this.promise.then( s => () => s.takeUntil(promise), Stream.Abort )
]).then( lazy => lazy()))
}
// map : ( Stream a, a -> b | Promise b ) => Stream b
Stream.prototype.map = function(f) {
let tailM, futP;
return this.isEmpty || this.isAbort ? this
: this.isCons ?
( tailM = this.tail.map(f),
futP = Promise.resolve(f(this.head))
.then( head => Stream.Cons(head, tailM), Stream.Abort ),
Stream.Future(futP))
// filter : (Stream a, a -> aBoolean | Promise aBoolean) => Stream a
Stream.prototype.filter = function(p) {
let tailF, futP;
return this.isEmpty || this.isAbort ? this
: this.isCons ?
( tailF = this.tail.filter(p),
futP = Promise.resolve( p(this.head) )
.then( ok => ok ? Stream.Cons(this.head, tailF) : tailF, Stream.Abort )
Stream.Future(futP))
// concat : (Stream a, Stream a) => Stream a
Stream.prototype.concat = function(s2) {
return this.isEmpty ? s2
: this.isAbort ? this
: this.isCons ? Stream.Cons( this.head, this.tail.concat(s2) )
: Stream.Future(this.promise.then(s => s.concat(s2), Stream.Abort))
}
Stream.seq([1,2,3,4,5], 2000, 1000) // 1st stream will yields normally
// merge : (Stream a, Stream a) => Stream a
Stream.prototype.merge = function(s2) {
return this.isEmpty ? s2
: this.isAbort ? this
: this.isCons ? Stream.Cons( this.head, this.tail.merge(s2) )
: ( !s2.isFuture ?
s2.merge(this)
: Stream.Future(
Promise.race([
// zip : (Stream a, Stream b) => Stream [a,b]
Stream.prototype.zip = function(s2) {
return this.isEmpty || this.isAbort ? this
: s2.isEmpty || s2.isAbort ? s2
: this.isCons && s2.isCons ?
Stream.Cons([this.head, s2.head], this.tail.zip(s2.tail))
: this.isCons && s2.isFuture ?
Stream.Future(s2.promise.then(s => this.zip(s), Stream.Abort))
: // this.isFuture && (s2.isCons || s2.isFuture)
// flatten : ( Stream (Stream a), (Stream a, Stream a) -> Stream a) -> Stream a
Stream.prototype.flatten = function(f) {
return this.isEmpty || this.isAbort ? this
: this.isCons ? f( this.head, this.tail.flatten(f) )
: Stream.Future(this.promise.then(s => s.flatten(f), Stream.Abort));
};
// mergeAll : Stream (Stream a) -> Stream a
Stream.prototype.mergeAll = function() {
return this.flatten( (s1, s2) => s1.merge(s2) );
}