Skip to content

Instantly share code, notes, and snippets.

@qmzik
Last active July 11, 2021 12:19
Show Gist options
  • Save qmzik/82c7513017d5c6eb5bb3ea3c86192023 to your computer and use it in GitHub Desktop.
Save qmzik/82c7513017d5c6eb5bb3ea3c86192023 to your computer and use it in GitHub Desktop.
/**
* A class used to wrap a user-provided Observer. Since the
* observer is just a plain objects with a couple of callbacks on it,
* this type will wrap that to ensure `next` does nothing if called after
* `complete` has been called, and that nothing happens if `complete`
* is called more than once.
*/
class SafeSubscriber<T> {
closed = false;
constructor(private destination: Observer<T>) {}
next(value: T) {
// Check to see if this is "closed" before nexting.
if (!this.closed) {
this.destination.next(value);
}
}
complete() {
// Make sure we're not completing an already "closed" subscriber.
if (!this.closed) {
// We're closed now.
this.closed = true;
this.destination.complete();
}
}
}
/**
* A class to wrap our function, to ensure that when the function is
* called with an observer, that observer is wrapped with a SafeSubscriber
*/
class Observable<T> {
constructor(private _wrappedFunc: (subscriber: Observer<T>) => void) {}
subscribe(observer: Observer<T>): void {
// We can wrap our observer in a "safe subscriber" that
// does the work of making sure it's not closed.
const subscriber = new SafeSubscriber(observer);
this._wrappedFunc(subscriber);
}
}
// Usage
// Now 4 won't be nexted after we complete.
const source = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // this does nothing.
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment