Skip to content

Instantly share code, notes, and snippets.

@snuffyDev
Last active October 1, 2022 06:54
Show Gist options
  • Save snuffyDev/503ef25bdf9433813cf13e2b9ae71e16 to your computer and use it in GitHub Desktop.
Save snuffyDev/503ef25bdf9433813cf13e2b9ae71e16 to your computer and use it in GitHub Desktop.

MPMC in TypeScript

Module to provide a Multi-producer, Multi-consumer FIFO message channel.

Demo: https://codesandbox.io/s/mpmc-demo-7cy89ub

Basic usage

  import { mpmc } from '...';
  
  interface Message { uid: number, data: Record<string, unknown> };
  
  const [ sender, recv ] = mpmc<Messaage>();
  
  // Listen to any incoming messages
  const unsubscribe = recv.subscribe((msg) => {
    // ...do something with data
  });
  
  sender.send({ uid: 123, data: {} });
  
  export const sender2 = sender.clone();
  
  // Close the channel for every sender and receiver
  sender2.close();
  // Stop listening to any incoming messages;
  unsubscribe()
export type Result<T> = T;
export type SyncResult<T> = T | Error;
export type Receiver<T> = {
/**
* Gets the state of the channel that owns this Receiver.
*
* Returns either `OPEN` or `CLOSED`
*
* @readonly
* @type {State}
*/
get state(): State;
/**
* Returns a new `Receiver`
*/
subscribe: (cb: Subscriber<T>) => Result<() => void>;
/**
* Closes the channel for all senders and receivers.
*/
close: () => void;
};
export type Sender<T> = {
/**
* Gets the state of the channel that owns this Sender.
*
* Returns either `OPEN` or `CLOSED`
*
* @readonly
* @type {State}
*/
get state(): State;
send: (data: T) => void;
/**
* Returns a new `Sender`
*/
clone: () => Sender<T>;
/**
* Closes the channel for all senders and receivers.
*/
close: () => void;
};
type State = "OPEN" | "CLOSED";
type Unsubscriber = () => void;
type Subscriber<T> = (value: T | State) => Unsubscriber | void;
type SubscribeFn<T> = (callback: Subscriber<T>) => Result<Unsubscriber>;
/**
* Creates a new 'Multi-Producer/Multi-Consumer' FIFO channel.
*
* `sender` can be cloned multiple times and will only ever send to the receiver
* returned from invoking this function.
*
* `receiver` can be subscribed to from
* multiple locations.
*
* @export
* @template T
* @return {*} {[Sender<T>, Receiver<T>]}
*/
export function mpmc<T = unknown>(): [Sender<T>, Receiver<T>] {
let subscribers = new Set<Subscriber<T>>([]);
let state: State = "OPEN";
const sender: Sender<T> = _sender();
const receiver: Receiver<T> = _receiver();
function _receiver() {
const close = () => {
if (state !== "CLOSED") {
state = "CLOSED";
}
// Clean up subscribers before clearing
for (const callback of subscribers.values()) {
const unsubFn = callback("CLOSED");
if (typeof unsubFn === "function") {
unsubFn();
}
}
subscribers.clear();
};
const subscribe: SubscribeFn<T> = (cb) => {
try {
if (state === "CLOSED") {
throw new Error("Cannot subscribe to a closed channel");
}
subscribers.add(cb);
return () => {
subscribers.delete(cb);
if (subscribers.size === 0) {
subscribers = null;
}
};
} catch (err) {
console.error(err);
}
};
return {
get state() {
return state;
},
close,
subscribe,
};
}
function _sender() {
const close = () => {
try {
if (state === "CLOSED") {
throw new Error("Cannot close an already closed channel!");
}
// Clean-up subscribers, notify of channel closure,
// and run any returned functions
for (const callback of subscribers.values()) {
const unsubFn = callback("CLOSED");
if (typeof unsubFn === "function") {
unsubFn();
}
}
subscribers.clear();
state = "CLOSED";
subscribers.clear();
return state;
} catch (err) {
console.error(err);
}
};
return {
get state() {
return state;
},
close,
send: (data: T) => {
try {
if (state === "CLOSED") {
throw new Error("Cannot send on a closed channel!");
}
for (const callback of subscribers.values()) {
callback(data);
}
return;
} catch (err) {
console.error(err);
}
},
clone: () => {
return _sender();
},
};
}
return [sender, receiver];
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment