Skip to content

Instantly share code, notes, and snippets.

@rpominov
Created December 25, 2015 22:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rpominov/20b62ed56c78376c224f to your computer and use it in GitHub Desktop.
Save rpominov/20b62ed56c78376c224f to your computer and use it in GitHub Desktop.
/* @flow */
type Sink<T> = (payload: T) => void
type Disposer = () => void
type Stream<T> = (s: Sink<T>) => Disposer
type F<A, B> = (x: A) => B
type LiftedF<A, B> = (s: Stream<A>) => Stream<B>
/* Lifts function `A => B` to a funcion that operates
* on streams `Stream<A> => Stream<B>`
*/
type Lift<A, B> = (f: F<A, B>) => LiftedF<A, B>
export const lift: Lift = fn => stream =>
sink => stream(payload => sink(fn(payload)))
/* Give a predicate `A => boolean` returns a funcion
* that operates on streams `Stream<A> => Stream<A>`.
* The result function returns a stream without values that don't satisfy predicate.
*/
type Filter<A> = (f: F<A, boolean>) => LiftedF<A, A>
export const filter: Filter = predicate => stream =>
sink => stream(payload => {
if (predicate(payload)) {
sink(payload)
}
})
/* Given a function `A => Stream<B>` returns a funcion
* that operates on streams `Stream<A> => Stream<B>`.
* The result function will spawn a `Stream<B>` for each value from `Stream<A>` using provided function.
* The final `Stream<B>` will contain values from all spawned `Stream<B>`.
*/
type Chain<A, B> = (f: F<A, Stream<B>>) => LiftedF<A, B>
export const chain: Chain = fn => stream =>
sink => {
let spawnedDisposers = []
const mainDisposer = stream(payload => {
spawnedDisposers.push(fn(payload)(sink))
})
return () => {
spawnedDisposers.forEach(fn => fn())
mainDisposer()
}
}
/* Same as chain(), except the final `Stream<B>` will contain
* only that values from each spawned streams that was
* emitted before next stream was spawned.
*/
export const chainLatest: Chain = fn => stream =>
sink => {
let spawnedDisposers = () => {}
const mainDisposer = stream(payload => {
spawnedDisposers()
spawnedDisposers = fn(payload)(sink)
})
return () => {
spawnedDisposers()
mainDisposer()
}
}
@rpominov
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment