Skip to content

Instantly share code, notes, and snippets.

@kriskowal
Created January 7, 2023 02:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kriskowal/c91006189f4aac26dea8f3f2d4815a95 to your computer and use it in GitHub Desktop.
Save kriskowal/c91006189f4aac26dea8f3f2d4815a95 to your computer and use it in GitHub Desktop.
import { makeStream, nullQueue } from '@endo/stream';
/**
* @template TValue
*/
const makePubSub = () => {
// Request pubsub async queue internals
let { promise: tailPromise, resolve: tailResolve } = makePromiseKit();
/**
* @param {TValue} value
*/
const pub = value => {
const { resolve, promise } = makePromiseKit();
tailResolve(Object.freeze({ value, promise }));
tailResolve = resolve;
// Unlike a queue, advance the read head for future subscribers.
tailPromise = promise;
};
const sub = () => {
// Capture the read head for the next published value.
let cursor = tailPromise;
return () => {
const promise = cursor.then(next => next.value);
cursor = cursor.then(next => next.promise);
return promise;
};
};
return harden({ pub, sub });
};
export const makeTopic = () => {
const { pub, sub } = makePubSub();
const publisher = makeStream(nullQueue, pub);
return harden({
publisher,
subscribe: () => makeStream(sub(), nullQueue),
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment