Skip to content

Instantly share code, notes, and snippets.

@timruffles
Created January 23, 2017 12:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timruffles/2e419db8b7eda3519fbbc603d87541e6 to your computer and use it in GitHub Desktop.
Save timruffles/2e419db8b7eda3519fbbc603d87541e6 to your computer and use it in GitHub Desktop.
Functional Observables
{
/*
interface Observer<T> {
next(t: T): void;
error(e: Error): void
complete(): void;
}
*/
class Observer {
constructor(next, error, complete) {
this.next = next;
this.error = error;
this.complete = complete;
}
}
const incrementer = scanObservable(createIntervalObservable(100), function(s, item) {
return s + 1;
}, 0);
const squaredIncrementer = mapObservable(incrementer, x => x * x);
const firstTenSquares = take(squaredIncrementer, 10);
// {{ square$ | async }}
firstTenSquares({
next(n) {
console.log("heard", n);
},
complete() {
console.log("finished");
},
})
// type CancelSubscription = () => void;
// type FunctionObservable<T> = (observer: Observer<T>) => CancelSubscription
function createIntervalObservable(nMilliseconds) {
return function(observer) {
let intervalToken = setInterval(() => observer.next(), nMilliseconds);
// cancellation
return () => {
if(intervalToken) {
clearInterval(intervalToken)
intervalToken = null;
}
}
}
}
// mapObservable(o: FunctionObservable<T>, mapper: (t: T) => U): FunctionObservable<U>
function mapObservable(observable, mapper) {
return function(observer) {
return observable({
next(t) {
const u = mapper(t);
observer.next(u);
},
error: (e) => observer.error(e),
complete: () => observer.complete(),
})
}
}
// scanObservable<S,T>(o: FunctionObservable<T>, reducer: (s: S, t: T) => S): FunctionObservable<S>
function scanObservable(observable, reducer, state) {
return function(observer) {
return observable({
next(t) {
state = reducer(state, t);
observer.next(state);
},
error: (e) => observer.error(e),
complete: () => observer.complete(),
})
}
}
function take(observable, n) {
return function(observer) {
const unsubscribe = observable({
next(t) {
if(n--) {
observer.next(t);
} else {
observer.complete();
unsubscribe();
}
},
error: (e) => observer.error(e),
complete: () => observer.complete(),
})
return unsubscribe;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment