Last active
September 4, 2022 05:23
-
-
Save nomyfan/46bb577e0254a4a0ed7e4e032e790bf0 to your computer and use it in GitHub Desktop.
Basic implementation of observable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://youtu.be/m40cF91F8_A | |
interface Observer<T> { | |
next: (value: T) => void; | |
error: (error: any) => void; | |
complete: () => void; | |
} | |
interface Unsubscriable { | |
unsubscribe: () => void; | |
} | |
/** | |
* 确保observer被安全调用。 | |
* 安全的定义: | |
* - error或者complete被调用之后,next、error、complete再调用不会有任何效果。 | |
* - 取消订阅(调用ubsubcribe)之后,next、error、complete再调用不会有任何效果。 | |
*/ | |
class Subscriber<T> implements Observer<T> { | |
private closed = false; | |
constructor(private source: Observer<T>, private subscription: Subscription) { | |
subscription.add(() => (this.closed = true)); | |
} | |
next(value: T) { | |
if (!this.closed) { | |
this.source.next(value); | |
} | |
} | |
error(error: any) { | |
if (!this.closed) { | |
this.source.error(error); | |
this.subscription.unsubscribe(); | |
} | |
} | |
complete() { | |
if (!this.closed) { | |
this.source.complete(); | |
this.subscription.unsubscribe(); | |
} | |
} | |
} | |
type Teardown = () => void; | |
/** | |
* 管理所有的teandowns,在unsubscribe的时候指定所有的teardowns。 | |
*/ | |
class Subscription { | |
private teardowns: Teardown[] = []; | |
add(teardown: Teardown) { | |
this.teardowns.push(teardown); | |
} | |
unsubscribe() { | |
for (const teardown of this.teardowns) { | |
teardown(); | |
} | |
this.teardowns = []; | |
} | |
} | |
class Observable<T> { | |
constructor(private init: (observer: Observer<T>) => Teardown) {} | |
subscribe(observer: Observer<T>): Unsubscriable { | |
const subscription = new Subscription(); | |
const subscriber = new Subscriber(observer, subscription); | |
subscription.add(this.init(subscriber)); | |
return subscription; | |
} | |
} | |
// 例子 | |
console.clear(); | |
const observable = new Observable((observer) => { | |
let i = 0; | |
const id = setInterval(() => { | |
observer.next(i++); | |
if (i > 3) { | |
observer.complete(); | |
observer.next(9999999); | |
} | |
}, 1000); | |
return () => { | |
console.log('tearing down'); | |
clearInterval(id); | |
}; | |
}); | |
const subscription = observable.subscribe({ | |
next: (value) => { | |
console.log(value); | |
}, | |
error: (error) => { | |
console.error(error); | |
}, | |
complete: () => { | |
console.log('done'); | |
}, | |
}); | |
setTimeout(() => { | |
subscription.unsubscribe(); | |
}, 2200); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment