This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// any : ( Stream a, a -> aBoolean ) -> Promise Boolean | |
Stream.prototype.any = function(pred) { | |
return this.isEmpty ? Promise.resolve(false) | |
: this.isAbort ? Promise.reject(this.err) | |
: this.isCons ? | |
(pred(this.head) ? Promise.resolve(true) : this.tail.any(pred) ) | |
: /* isFuture */ | |
this.promise.then( s => s.any(pred) ); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// takeUntil : (Stream a, Promise) => Stream a | |
Stream.prototype.takeUntil = function(untilP) { | |
return this.isEmpty || this.isAbort ? this | |
: this.isCons ? Stream.Cons( this.head, this.tail.takeUntil(untilP) ) | |
: Stream.Future( | |
// can you spot a problem here ? | |
Promise.race([ | |
untilP.then( _ => Stream.Empty, Stream.Abort ), | |
this.promise.then( s => s.takeUntil(untilP), Stream.Abort ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Stream.prototype.takeUntil = function(untilP) { | |
... | |
// isFuture | |
Stream.Future( | |
Promise.race([ | |
untilP.then( _ => () => Stream.Empty, Stream.Abort ), | |
this.promise.then( s => () => s.takeUntil(promise), Stream.Abort ) | |
]).then( lazy => lazy())) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// map : ( Stream a, a -> b | Promise b ) => Stream b | |
Stream.prototype.map = function(f) { | |
let tailM, futP; | |
return this.isEmpty || this.isAbort ? this | |
: this.isCons ? | |
( tailM = this.tail.map(f), | |
futP = Promise.resolve(f(this.head)) | |
.then( head => Stream.Cons(head, tailM), Stream.Abort ), | |
Stream.Future(futP)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// filter : (Stream a, a -> aBoolean | Promise aBoolean) => Stream a | |
Stream.prototype.filter = function(p) { | |
let tailF, futP; | |
return this.isEmpty || this.isAbort ? this | |
: this.isCons ? | |
( tailF = this.tail.filter(p), | |
futP = Promise.resolve( p(this.head) ) | |
.then( ok => ok ? Stream.Cons(this.head, tailF) : tailF, Stream.Abort ) | |
Stream.Future(futP)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// concat : (Stream a, Stream a) => Stream a | |
Stream.prototype.concat = function(s2) { | |
return this.isEmpty ? s2 | |
: this.isAbort ? this | |
: this.isCons ? Stream.Cons( this.head, this.tail.concat(s2) ) | |
: Stream.Future(this.promise.then(s => s.concat(s2), Stream.Abort)) | |
} | |
Stream.seq([1,2,3,4,5], 2000, 1000) // 1st stream will yields normally |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// merge : (Stream a, Stream a) => Stream a | |
Stream.prototype.merge = function(s2) { | |
return this.isEmpty ? s2 | |
: this.isAbort ? this | |
: this.isCons ? Stream.Cons( this.head, this.tail.merge(s2) ) | |
: ( !s2.isFuture ? | |
s2.merge(this) | |
: Stream.Future( | |
Promise.race([ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// zip : (Stream a, Stream b) => Stream [a,b] | |
Stream.prototype.zip = function(s2) { | |
return this.isEmpty || this.isAbort ? this | |
: s2.isEmpty || s2.isAbort ? s2 | |
: this.isCons && s2.isCons ? | |
Stream.Cons([this.head, s2.head], this.tail.zip(s2.tail)) | |
: this.isCons && s2.isFuture ? | |
Stream.Future(s2.promise.then(s => this.zip(s), Stream.Abort)) | |
: // this.isFuture && (s2.isCons || s2.isFuture) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// flatten : ( Stream (Stream a), (Stream a, Stream a) -> Stream a) -> Stream a | |
Stream.prototype.flatten = function(f) { | |
return this.isEmpty || this.isAbort ? this | |
: this.isCons ? f( this.head, this.tail.flatten(f) ) | |
: Stream.Future(this.promise.then(s => s.flatten(f), Stream.Abort)); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// mergeAll : Stream (Stream a) -> Stream a | |
Stream.prototype.mergeAll = function() { | |
return this.flatten( (s1, s2) => s1.merge(s2) ); | |
} |