Skip to content

Instantly share code, notes, and snippets.

@fabiancook
Last active May 11, 2021 09:05
Show Gist options
  • Save fabiancook/dbb86ddbef437c6f49910438ec34d883 to your computer and use it in GitHub Desktop.
Save fabiancook/dbb86ddbef437c6f49910438ec34d883 to your computer and use it in GitHub Desktop.
import { observe, Observable } from "./observe";
export function run(observable: Observable<string>) {
for await (const value of observe(observable)) {
console.log(value);
}
}
import { source } from "iterable";
export interface ObservableSubscription {
unsubscribe(): void;
}
export interface Observable<T> {
subscribe(onNext: (value: T) => void, onError?: (error: unknown) => void, onComplete?: () => void): unknown;
}
export function isObservableSubscription(value: unknown): value is ObservableSubscription {
function isObservableSubscriptionLike(value: unknown): value is Partial<ObservableSubscription> {
return !!value;
}
return (
isObservableSubscriptionLike(value) &&
typeof value === "function"
);
}
// I haven't tried this yet, but it would be similar to this to get it all going.
// If you look in history of this gist you will be able to see the usage without unsubscribe support.
export async function *observe<T>(observable: Observable<T>): AsyncIterable<T> {
const target = source<T>();
const subscription = observable.subscribe(
value => target.push(value),
error => target.throw(error),
() => target.close()
);
try {
yield* target;
} finally {
// We don't know what is returned, so could be a promise we need to wait on
if (isObservableSubscription(subscription)) {
await subscription.unsubscribe();
} else if (typeof subscription === "function") {
await subscription();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment