Skip to content

Instantly share code, notes, and snippets.

@cowboyd
Last active July 28, 2023 13:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cowboyd/4cda54885d856a9f21798df6939d6937 to your computer and use it in GitHub Desktop.
Save cowboyd/4cda54885d856a9f21798df6939d6937 to your computer and use it in GitHub Desktop.
A Queue that implements v3 subscription. Useful when working with low level subscription apis.
import type { Resolve, Subscription } from "./types.ts";
import { action } from "./instructions.ts";
export interface Queue<T, TClose> extends Subscription<T, TClose> {
add(item: T): void;
close(value: TClose): void;
}
export function createQueue<T, TClose>(): Queue<T, TClose> {
type Item = IteratorResult<T, TClose>;
let items: Item[] = [];
let consumers: Resolve<Item>[] = [];
function enqueue(item: Item) {
items.unshift(item);
while (items.length > 0 && consumers.length > 0) {
let consume = consumers.pop() as Resolve<Item>;
let top = items.pop() as Item;
consume(top);
}
}
return {
add: (value) => enqueue({ done: false, value }),
close: (value) => enqueue({ done: true, value }),
*next() {
let item = items.pop();
if (item) {
return item;
} else {
return yield* action<Item>(function* (resolve) {
consumers.unshift(resolve);
})
}
}
}
}
import type { Channel, Stream, Operation } from "./types.ts";
import { resource } from "./instructions.ts";
import { createQueue, type Queue } from "./queue.ts";
export function createChannel<T, TClose = void>(): Channel<T, TClose> {
let subscribers = new Set<Queue<T, TClose>>();
let output: Stream<T, TClose> = resource(function* Subscription(provide) {
let subscriber = createQueue<T, TClose>();
subscribers.add(subscriber);
try {
yield* provide(subscriber);
} finally {
subscribers.delete(subscriber);
}
});
let input = {
*send(value: T): Operation<void> {
for (let subscriber of [...subscribers]) {
subscriber.add(value);
}
},
*close(value: TClose) {
for (let subscriber of [...subscribers]) {
subscriber.close(value);
}
},
};
return { input, output };
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment