Skip to content

Instantly share code, notes, and snippets.

@nomyfan
Last active September 4, 2022 05:23
Show Gist options
  • Save nomyfan/46bb577e0254a4a0ed7e4e032e790bf0 to your computer and use it in GitHub Desktop.
Save nomyfan/46bb577e0254a4a0ed7e4e032e790bf0 to your computer and use it in GitHub Desktop.
Basic implementation of observable
// https://youtu.be/m40cF91F8_A
interface Observer<T> {
next: (value: T) => void;
error: (error: any) => void;
complete: () => void;
}
interface Unsubscriable {
unsubscribe: () => void;
}
/**
* 确保observer被安全调用。
* 安全的定义:
* - error或者complete被调用之后,next、error、complete再调用不会有任何效果。
* - 取消订阅(调用ubsubcribe)之后,next、error、complete再调用不会有任何效果。
*/
class Subscriber<T> implements Observer<T> {
private closed = false;
constructor(private source: Observer<T>, private subscription: Subscription) {
subscription.add(() => (this.closed = true));
}
next(value: T) {
if (!this.closed) {
this.source.next(value);
}
}
error(error: any) {
if (!this.closed) {
this.source.error(error);
this.subscription.unsubscribe();
}
}
complete() {
if (!this.closed) {
this.source.complete();
this.subscription.unsubscribe();
}
}
}
type Teardown = () => void;
/**
* 管理所有的teandowns,在unsubscribe的时候指定所有的teardowns。
*/
class Subscription {
private teardowns: Teardown[] = [];
add(teardown: Teardown) {
this.teardowns.push(teardown);
}
unsubscribe() {
for (const teardown of this.teardowns) {
teardown();
}
this.teardowns = [];
}
}
class Observable<T> {
constructor(private init: (observer: Observer<T>) => Teardown) {}
subscribe(observer: Observer<T>): Unsubscriable {
const subscription = new Subscription();
const subscriber = new Subscriber(observer, subscription);
subscription.add(this.init(subscriber));
return subscription;
}
}
// 例子
console.clear();
const observable = new Observable((observer) => {
let i = 0;
const id = setInterval(() => {
observer.next(i++);
if (i > 3) {
observer.complete();
observer.next(9999999);
}
}, 1000);
return () => {
console.log('tearing down');
clearInterval(id);
};
});
const subscription = observable.subscribe({
next: (value) => {
console.log(value);
},
error: (error) => {
console.error(error);
},
complete: () => {
console.log('done');
},
});
setTimeout(() => {
subscription.unsubscribe();
}, 2200);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment