Skip to content

Instantly share code, notes, and snippets.

@jquense
Created December 5, 2022 16:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jquense/0feaabbd51e8e6ff8516d8fb8838afa7 to your computer and use it in GitHub Desktop.
Save jquense/0feaabbd51e8e6ff8516d8fb8838afa7 to your computer and use it in GitHub Desktop.
tiny observable implementation
export interface Subscription {
readonly closed: boolean;
unsubscribe: () => void;
}
export interface Observer<T> {
closed: boolean;
error: (error: any) => void;
complete: () => void;
next: (value: T) => void;
}
export type SubscriberFunction<T> = (
observer: Observer<T>,
) => { unsubscribe: () => void } | void;
export default class Observable<T> {
private _subscribe: SubscriberFunction<T>;
public constructor(subscribe: SubscriberFunction<T>) {
this._subscribe = subscribe;
}
subscribe(
nextOrObserver: Partial<Observer<T>> | ((value?: T) => void),
error?: (errorValue: Error) => void,
complete?: () => void,
) {
let closed = false;
const observer =
typeof nextOrObserver === 'function'
? { next: nextOrObserver, error, complete }
: nextOrObserver;
// eslint-disable-next-line prefer-const
let subscriber: void | { unsubscribe: () => void };
const subscription: Subscription = {
get closed() {
return closed;
},
unsubscribe() {
if (closed) return;
closed = true;
subscriber?.unsubscribe();
},
};
subscriber = this._subscribe({
closed,
error(err?: any) {
if (closed) return;
subscription.unsubscribe();
observer.error?.(err);
},
complete() {
if (closed) return;
try {
subscription.unsubscribe();
observer.complete?.();
} catch (err) {
subscription.unsubscribe();
observer.error?.(err);
}
},
next(value: T) {
if (closed) return;
try {
observer.next?.(value);
} catch (err) {
subscription.unsubscribe();
observer.error?.(err);
}
},
});
return subscription;
}
}
@jquense
Copy link
Author

jquense commented Dec 5, 2022

Examples of use

function fromIterable<T>(source: Iterable<T>): Observable<T> {
  return new Observable<T>(observer => { 
     for (let item of source) {
       observer.next(item)
     }
     observer.complete()
  })
}

const observable = fromIterable([1, 2, 3, 4])

function *generateNumbers() {
  let i = 0
  while (true) yield ++i
}
const infiniteObservable = fromIterable(generateNumbers())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment