Created
October 8, 2015 01:43
-
-
Save jethrolarson/a54492b99f6cb739a9b2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Similar to Future but contains a series of future values | |
const {I, compose} = require('../util') | |
// ( -> a) -> Stream a | |
function _Stream(pusher) { | |
this.push = pusher || I | |
this['@@type'] = "Stream" | |
} | |
const Stream = pusher => new _Stream(pusher) | |
const noop = () -> null | |
// close breaks the stream | |
const close = s => s.push = noop | |
const closed = s => s.push == noop | |
//:: a -> Stream a | |
const of = a => Stream().push(a) | |
//:: (a -> b) -> Stream a -> Stream b | |
const map = f => s => Stream(compose(f, s.push)) | |
// Given a function that creates streams of `b` from values `a`, and a stream of `a`, return a stream of all `b`s. | |
//:: (a -> Stream b) -> Stream a -> Stream b | |
const chain = f => a => { | |
// returned stream yields when any streams mapped to original stream yield | |
var b = map( | |
compose(map(b.push), f) | |
)(a) | |
return b | |
} | |
//TODO this also wont work | |
// var a = Stream() | |
// var b = chain(makeBStream)(a) | |
// a.push(1) | |
// nothing happens on b | |
var dirtyPush = list => v => list.push(v) | |
//Internally statefull and could consume a lot of memory if `a` outruns `b` | |
//:: (a -> b -> c) -> Stream a -> Stream b -> Stream c | |
const lift = f => a => b => { | |
var c = Stream() | |
var queueA = [] | |
var queueB = [] | |
var check = () => { | |
if(closed(a) || closed(b)){ | |
close(c) | |
} | |
else if(queueA.length && queueB.length){ | |
c.pusher(f(queueA.pop())(queueB.pop())) | |
} | |
} | |
//TODO this wont work. nothing associates the compose with the original function... | |
map(compose(check, dirtyPush(queueA)))(a) | |
map(compose(check, dirtyPush(queueB)))(b) | |
return c | |
} | |
// Given a stream of functions from `a` to `b`, and a stream of `a`, return a stream of `b` | |
//:: Stream (a -> b) -> Stream a -> Stream b | |
const ap = lift(I) | |
const filter = f => s => { | |
var b = map(x => f(x) ? b.push(x) : null)(s) | |
return b | |
} | |
//TODO won't work | |
//events on both streams happen on returned stream | |
//:: Stream a -> Stream b -> Stream (a && b) | |
const concat = a => b => { | |
var c = Stream() | |
map(c.push)(a) | |
map(c.push)(b) | |
return c | |
} | |
var Future = require('./future') | |
//:: (a -> Boolean) -> Stream a -> Future Boolean | |
const some = pred => s => | |
Future((rej, res) => map(x => closed(s) ? res(false) : pred(x) ? res(true): null)(s)) | |
//:: (a -> Boolean) -> Stream a -> Future a | |
const find => pred => s => | |
Future((rej, res) => map(x => closed(s) ? rej(x) : pred(x) ? res(x): null)(s)) | |
//Closes result stream after taking n | |
//:: Int -> Stream -> Stream | |
const take = n => s => { | |
var b = Stream() | |
map(x => 0 < n-- ? b.push(x) : close(s))(s) | |
return b | |
} | |
//:: Int -> Stream -> Stream | |
const drop = n => s => { | |
var b = Stream() | |
map(x => 0 < n-- ? null : b.push(x))(s) | |
return b | |
} | |
//:: [a] -> Stream a | |
var fromArray = arr => { | |
var s = Stream() | |
arr.forEach(s.push) | |
close(s) | |
return s | |
} | |
var toArray = s => | |
var arr = [] | |
map(x => arr.push(x))(s) | |
Object.assign(Stream, {of, map, chain, ap}) | |
module.exports = Stream |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment