Skip to content

Instantly share code, notes, and snippets.

@caroso1222
Last active December 15, 2018 17:18
Show Gist options
  • Save caroso1222/886fc759347e9c93b7bbf7c8976e02c4 to your computer and use it in GitHub Desktop.
Save caroso1222/886fc759347e9c93b7bbf7c8976e02c4 to your computer and use it in GitHub Desktop.
RxJS Pausify
import { Subject, Observable, never, BehaviorSubject } from 'rxjs';
import { switchMap, takeUntil, tap, materialize, dematerialize } from 'rxjs/operators';
export type PauseableObservable<T> = Observable<T> & Pauser<T>;
export function pausify<T>(obs: Observable<T>): PauseableObservable<T> {
return new Pauser<T>(obs) as PauseableObservable<T>;
}
class Pauser<T> {
private pauser$: BehaviorSubject<boolean>;
private observable: Observable<T>;
private stop$ = new Subject<void>();
private paused = false;
constructor(source: Observable<T>, paused = false) {
this.pauser$ = new BehaviorSubject<boolean>(paused);
this.observable = this.pauser$
.pipe(
tap(paused => this.paused = paused),
switchMap(paused => paused ? never() : source.pipe(materialize())),
dematerialize()
takeUntil(this.stop$)
);
for (let key in Observable.prototype) {
this[key] = (...args) => this.observable[key].apply(this.observable, args);
}
}
pause() {
if (!this.paused) {
this.pauser$.next(true);
}
}
resume() {
if (this.paused) {
this.pauser$.next(false);
}
}
stop() {
this.stop$.next();
this.stop$.complete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment