Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Observables with pure FP
// Observable is an Union Type, with the following variants
const Empty = () => ['EMPTY']
const Cons = (head, tail) => ['CONS', head, tail]
const Future = promise => ['FUTURE', promise]
// race between 2 promises; each promise will resolve to a lazy value
const lazyRace = (p1, p2) => Promise.race([p1,p2]).then(lazy => lazy())
// function composition
const compose = (...fns) => (arg) => fns.reduceRight((res, f) => f(res), arg)
// Pattern matching helper: W'll use it to match an Observable instance
const match = cases => ([type, ...args]) => {
const handler = cases[type] || cases._
if(!handler)
throw 'Unkown case ' + type
return handler(...args)
}
// f: any → any
// map: f → Observable → Observable
const map = fn =>
match({
EMPTY : Empty,
CONS : (head, tail) => Cons( fn(head), map(fn)(tail) ),
FUTURE : (promise) => Future( promise.then(map(fn)) )
})
// p: any → Boolean
// filter: p → Observable → Observable
const filter = predicate =>
match({
EMPTY : Empty,
CONS : (head, tail) => (
predicate(head)
? Cons(head, filter(predicate)(tail) )
: filter(predicate)(tail)
),
FUTURE : (promise) => Future( promise.then(filter(predicate)) )
})
// first : Observable → Promise
const first = match({
EMPTY : () => Promise.reject('Empty Observable'),
CONS : (head) => Promise.resolve(head),
FUTURE : (promise) => promise.then(first)
})
// takeUntil: Promise → Observable → Observable
const takeUntil = untilP =>
match({
EMPTY : Empty,
CONS : (head, tail) => Cons(head, takeUntil(untilP)(tail)),
FUTURE : (promise) => Future(
lazyRace(
untilP.then(() => Empty),
promise.then(o => () => takeUntil(untilP)(o))
)
)
})
// skipUntil: Promise → Observable → Observable
const skipUntil = untilP => o =>
match({
EMPTY : Empty,
CONS : (_, tail) => skipUntil(untilP)(tail),
FUTURE : (promise) => Future(
lazyRace(
untilP.then(() => o),
promise.then(ro => () => skipUntil(untilP)(ro))
)
)
})(o)
// concat: (Observable, Observable) → Observable
const concat = (o1, o2) =>
match({
EMPTY : () => o2,
CONS : (head, tail) => Cons(head, concat(tail, o2)),
FUTURE : (promise) => Future( promise.then(ro1 => concat(ro1, o2)) )
})(o1)
// merge: (Observable, Observable) → Observable
const merge = (o1, o2) =>
match({
EMPTY : () => o2,
CONS : (head, tail) => Cons(head, merge(tail, o2)),
FUTURE : (promise1) => (
match({
FUTURE: promise2 => Future(
lazyRace(
promise1.then(ro1 => () => merge(ro1, o2)),
promise2.then(ro2 => () => merge(ro2, o1))
)
),
_ : () => merge(o2, o1)
})(o2)
)
})(o1)
// relay: (Observable, Observable) → Observable
const relay = (o1, o2) =>
match({
EMPTY : () => o1,
_ : () => concat( takeUntil(first(o2))(o1), o2 )
})(o2)
// f : (Observable, Observable) → Observable
// flattenBy: f → Observable<Observable> → Observable
const flattenBy = fn =>
match({
EMPTY : Empty,
CONS : (head, tail) => fn(head, flattenBy(fn)(tail)),
FUTURE : (promise) => Future(promise.then(flattenBy(fn)))
})
// *: Observable<Observable> → Observable
const mergeAll = flattenBy(merge)
const concatAll = flattenBy(concat)
const relayall = flattenBy(relay)
// f: any → Observable
// *: f → Observable → Observable
const concatMap = fn => compose(flattenBy(concat), map(fn))
const flatMap = fn => compose(flattenBy(merge), map(fn))
const flatMapLatest = fn => compose(flattenBy(relay), map(fn))
// onNext: any → ()
// onDone: () → ()
const forEach = (onNext, onDone) =>
match({
EMPTY : onDone,
CONS : (head, tail) => {
onNext(head)
forEach(onNext, onDone)(tail)
},
FUTURE : (promise) => promise.then(forEach(onNext, onDone))
})
// log: String → Observable → ()
const log = prefix =>
forEach(
console.log.bind(console),
console.log.bind(console, 'done')
)
// fromArray: Array<a> → Observable<a>
const fromArray = arr =>
(!arr.length)
? Empty()
: Cons(arr[0], fromArray(arr.slice(1)))
// sequence: (Array<a>, Number) → Observable<a>
const sequence = (arr, ms) =>
(!arr.length)
? Empty()
: Future(new Promise(resolve =>
setTimeout(() => resolve(
Cons(arr[0], sequence(arr.slice(1), ms))
), ms)
))
// range: (Number, Number, Number) → Observable<Number>
const range = (min, max, ms) =>
(min > max)
? Empty()
: Future(new Promise(resolve =>
setTimeout(() => resolve(
Cons(min, range(min + 1, max, ms))
), ms)
))
// nextEvent: (EventTarget, String) → Promise<anEvent>
const nextEvent = (elm, eventType) => {
return new Promise(resolve => {
const cb = e => {
resolve(e)
elm.removeEventListener(eventType, cb)
}
elm.addEventListener(eventType, cb)
})
}
// fromEvent: (EventTarget, String) → Observable<anEvent>
const fromEvent = (elm, eventType) =>
Future(nextEvent(elm, eventType)
.then(e => Cons(e, fromEvent(elm, eventType))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment