Skip to content

Instantly share code, notes, and snippets.

@cowboyd
Created February 23, 2023 21:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cowboyd/9f6d378d4336bc9f1074ccf3f53fca45 to your computer and use it in GitHub Desktop.
Save cowboyd/9f6d378d4336bc9f1074ccf3f53fca45 to your computer and use it in GitHub Desktop.
Stateful observer for guaranteed delivery of continuations.
import type { Observer, Resolve } from "../types.ts";
import { shift } from "../deps.ts";
export function createObservable<T>() {
let observers = new Map<Observer<T>, Resolve<T>>();
return {
notify(value: T) {
for (let dispatch of observers.values()) {
dispatch(value);
}
},
observe(): Observer<T> {
let events: T[] = [];
let consumers: Resolve<T>[] = [];
let observer = {
*[Symbol.iterator]() {
let event = events.pop();
if (event) {
return event;
} else {
return yield* shift<T>(function*(k) {
consumers.unshift(k);
})
}
},
drop() {
observers.delete(observer);
},
};
observers.set(observer, (event: T) => {
events.unshift(event);
while (events.length > 0 && consumers.length > 0) {
let consume = consumers.pop() as Resolve<T>;
let event = events.pop() as T;
consume(event);
}
});
return observer;
},
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment