Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
// Similar to Future but contains a series of future values
const {I, compose} = require('../util')
// ( -> a) -> Stream a
function _Stream(pusher) {
this.push = pusher || I
this['@@type'] = "Stream"
}
const Stream = pusher => new _Stream(pusher)
const noop = () -> null
// close breaks the stream
const close = s => s.push = noop
const closed = s => s.push == noop
//:: a -> Stream a
const of = a => Stream().push(a)
//:: (a -> b) -> Stream a -> Stream b
const map = f => s => Stream(compose(f, s.push))
// Given a function that creates streams of `b` from values `a`, and a stream of `a`, return a stream of all `b`s.
//:: (a -> Stream b) -> Stream a -> Stream b
const chain = f => a => {
// returned stream yields when any streams mapped to original stream yield
var b = map(
compose(map(b.push), f)
)(a)
return b
}
//TODO this also wont work
// var a = Stream()
// var b = chain(makeBStream)(a)
// a.push(1)
// nothing happens on b
var dirtyPush = list => v => list.push(v)
//Internally statefull and could consume a lot of memory if `a` outruns `b`
//:: (a -> b -> c) -> Stream a -> Stream b -> Stream c
const lift = f => a => b => {
var c = Stream()
var queueA = []
var queueB = []
var check = () => {
if(closed(a) || closed(b)){
close(c)
}
else if(queueA.length && queueB.length){
c.pusher(f(queueA.pop())(queueB.pop()))
}
}
//TODO this wont work. nothing associates the compose with the original function...
map(compose(check, dirtyPush(queueA)))(a)
map(compose(check, dirtyPush(queueB)))(b)
return c
}
// Given a stream of functions from `a` to `b`, and a stream of `a`, return a stream of `b`
//:: Stream (a -> b) -> Stream a -> Stream b
const ap = lift(I)
const filter = f => s => {
var b = map(x => f(x) ? b.push(x) : null)(s)
return b
}
//TODO won't work
//events on both streams happen on returned stream
//:: Stream a -> Stream b -> Stream (a && b)
const concat = a => b => {
var c = Stream()
map(c.push)(a)
map(c.push)(b)
return c
}
var Future = require('./future')
//:: (a -> Boolean) -> Stream a -> Future Boolean
const some = pred => s =>
Future((rej, res) => map(x => closed(s) ? res(false) : pred(x) ? res(true): null)(s))
//:: (a -> Boolean) -> Stream a -> Future a
const find => pred => s =>
Future((rej, res) => map(x => closed(s) ? rej(x) : pred(x) ? res(x): null)(s))
//Closes result stream after taking n
//:: Int -> Stream -> Stream
const take = n => s => {
var b = Stream()
map(x => 0 < n-- ? b.push(x) : close(s))(s)
return b
}
//:: Int -> Stream -> Stream
const drop = n => s => {
var b = Stream()
map(x => 0 < n-- ? null : b.push(x))(s)
return b
}
//:: [a] -> Stream a
var fromArray = arr => {
var s = Stream()
arr.forEach(s.push)
close(s)
return s
}
var toArray = s =>
var arr = []
map(x => arr.push(x))(s)
Object.assign(Stream, {of, map, chain, ap})
module.exports = Stream
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.