Skip to content

Instantly share code, notes, and snippets.

@qmzik
Created July 11, 2021 12:35
Show Gist options
  • Save qmzik/9cb3a52623950e2854ade37d3c99eaf2 to your computer and use it in GitHub Desktop.
Save qmzik/9cb3a52623950e2854ade37d3c99eaf2 to your computer and use it in GitHub Desktop.
/**
* Our subscription type. This is to manage teardowns.
*/
class Subscription {
private teardowns = new Set<() => void>();
add(teardown: () => void) {
this.teardowns.add(teardown);
}
unsubscribe() {
for (const teardown of this.teardowns) {
teardown();
}
this.teardowns.clear();
}
}
class SafeSubscriber<T> {
closed = false;
constructor(
private destination: Partial<Observer<T>>,
private subscription: Subscription,
) {
// Make sure that if the subscription is unsubscribed,
// we don't let any more notifications through this subscriber.
subscription.add(() => (this.closed = true));
}
next(value: T) {
if (!this.closed) {
this.destination.next?.(value);
}
}
complete() {
if (!this.closed) {
this.closed = true;
this.destination.complete?.();
this.subscription.unsubscribe();
}
}
error(err: any) {
if (!this.closed) {
this.closed = true;
this.destination.error?.(err);
this.subscription.unsubscribe();
}
}
}
class Observable<T> {
constructor(private _wrappedFunc: (subscriber: Observer<T>) => () => void) {}
subscribe(observer: Observer<T>) {
const subscription = new Subscription();
const subscriber = new SafeSubscriber(observer, subscription);
subscription.add(this._wrappedFunc(subscriber));
return subscription;
}
}
const helloSocket = new Observable<string>((subscriber) => {
const socket = new WebSocket('wss://echo.websocket.org');
socket.onopen = () => {
socket.send('Hello, World!');
};
socket.onmessage = (e) => {
subscriber.next(e.data);
};
socket.onclose = (e) => {
if (e.wasClean) {
subscriber.complete();
} else {
subscriber.error(new Error('Socket closed dirty!'));
}
};
return () => {
if (socket.readyState <= WebSocket.OPEN) {
socket.close();
}
};
});
const subscription = helloSocket.subscribe({
next: console.log,
complete: () => console.log('server closed'),
error: console.error,
});
// Later, we can unsubscribe!
subscription.unsubscribe();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment