Skip to content

Instantly share code, notes, and snippets.

@erodactyl
Created October 10, 2021 20:22
Show Gist options
  • Save erodactyl/c00a18ee751e3e35d05df542dd41759d to your computer and use it in GitHub Desktop.
Save erodactyl/c00a18ee751e3e35d05df542dd41759d to your computer and use it in GitHub Desktop.
Implementation of basic rxjs observable and a few operators.
interface Observer<T> {
next: (el: T) => void;
}
type Dispose = () => void;
type Operator<T, U> = (source$: Observable<T>) => Observable<U>
class Observable<T> {
constructor(private cb: (observer: Observer<T>) => Dispose) {}
subscribe(observer: Observer<T>) {
const dispose = this.cb(observer)
return {
unsubscribe: () => dispose()
}
}
pipe(): Observable<T>;
pipe<A>(fn: Operator<T, A>): Observable<A>;
pipe<A, B>(fn1: Operator<T, A>, fn2: Operator<A, B>): Observable<B>;
pipe<A, B, C>(fn1: Operator<T, A>, fn2: Operator<A, B>, fn3: Operator<B, C>): Observable<C>;
pipe<A, B, C>(fn1: Operator<T, A>, fn2: Operator<A, B>, fn3: Operator<B, C>, ...rest: Operator<any, any>[]): Observable<any>;
pipe(...operators: Operator<any, any>[]): Observable<any> {
return operators.reduce<Observable<any>>((source, operator) => operator(source), this)
}
}
const interval = (tick: number) => new Observable<number>(observer => {
let i = 0;
const intervalRef = setInterval(() => {
observer.next(i++);
}, tick);
return () => clearInterval(intervalRef)
})
const map = <T, U>(mapper: (el: T) => U) => (source$: Observable<T>): Observable<U> => {
return new Observable(observer => {
const subscription = source$.subscribe({ next: (el) => observer.next(mapper(el)) });
return () => subscription.unsubscribe();
});
}
const tap = <T>(cb: (el: T) => void) => (source$: Observable<T>): Observable<T> => {
return new Observable(observer => {
const subscription = source$.subscribe({ next: (el) => {
cb(el);
observer.next(el);
}})
return () => subscription.unsubscribe();
})
}
const doubleInterval = interval(1000).pipe(map(el => el * 2), tap(console.log))
const sub = doubleInterval.subscribe({ next: () => {} })
setInterval(() => {
sub.unsubscribe()
}, 10000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment