Created
July 11, 2021 12:35
-
-
Save qmzik/9cb3a52623950e2854ade37d3c99eaf2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Our subscription type. This is to manage teardowns. | |
*/ | |
class Subscription { | |
private teardowns = new Set<() => void>(); | |
add(teardown: () => void) { | |
this.teardowns.add(teardown); | |
} | |
unsubscribe() { | |
for (const teardown of this.teardowns) { | |
teardown(); | |
} | |
this.teardowns.clear(); | |
} | |
} | |
class SafeSubscriber<T> { | |
closed = false; | |
constructor( | |
private destination: Partial<Observer<T>>, | |
private subscription: Subscription, | |
) { | |
// Make sure that if the subscription is unsubscribed, | |
// we don't let any more notifications through this subscriber. | |
subscription.add(() => (this.closed = true)); | |
} | |
next(value: T) { | |
if (!this.closed) { | |
this.destination.next?.(value); | |
} | |
} | |
complete() { | |
if (!this.closed) { | |
this.closed = true; | |
this.destination.complete?.(); | |
this.subscription.unsubscribe(); | |
} | |
} | |
error(err: any) { | |
if (!this.closed) { | |
this.closed = true; | |
this.destination.error?.(err); | |
this.subscription.unsubscribe(); | |
} | |
} | |
} | |
class Observable<T> { | |
constructor(private _wrappedFunc: (subscriber: Observer<T>) => () => void) {} | |
subscribe(observer: Observer<T>) { | |
const subscription = new Subscription(); | |
const subscriber = new SafeSubscriber(observer, subscription); | |
subscription.add(this._wrappedFunc(subscriber)); | |
return subscription; | |
} | |
} | |
const helloSocket = new Observable<string>((subscriber) => { | |
const socket = new WebSocket('wss://echo.websocket.org'); | |
socket.onopen = () => { | |
socket.send('Hello, World!'); | |
}; | |
socket.onmessage = (e) => { | |
subscriber.next(e.data); | |
}; | |
socket.onclose = (e) => { | |
if (e.wasClean) { | |
subscriber.complete(); | |
} else { | |
subscriber.error(new Error('Socket closed dirty!')); | |
} | |
}; | |
return () => { | |
if (socket.readyState <= WebSocket.OPEN) { | |
socket.close(); | |
} | |
}; | |
}); | |
const subscription = helloSocket.subscribe({ | |
next: console.log, | |
complete: () => console.log('server closed'), | |
error: console.error, | |
}); | |
// Later, we can unsubscribe! | |
subscription.unsubscribe(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment