Skip to content

Instantly share code, notes, and snippets.

@randallb
Last active December 31, 2023 02:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save randallb/a0fb090d76cf58504ff73c6485f094ea to your computer and use it in GitHub Desktop.
Save randallb/a0fb090d76cf58504ff73c6485f094ea to your computer and use it in GitHub Desktop.
Shared worker shim + promised based insanity

Promised based typesafe shared worker messaging

I built this thing, it's kind of insane. In the worker, you can call it almost like any other async function... including type safety.

I don't really have time to explain more, but maybe i will later.

import BfSharedWorker from "./main.ts"
const worker = new BfSharedWorker();
const result = await worker.adder(1,2);
const typeErrorLol = await worker.adder("lol", 2);
class SharedWorkerShim extends EventTarget implements SharedWorker {
port: MessagePort;
private backingWorker: Worker;
constructor(url: string | URL, options: string | WorkerOptions = {}) {
super();
const { port1, port2 } = new MessageChannel();
this.port = new Proxy(port1, {
get: (target, property) => {
if (property === "start") {
return () => {
target["start"](); // Call the original start method
// Dispatch a fake 'onconnect' event when start is called
setTimeout(() => {
this.backingWorker.postMessage({
type: "onconnect",
ports: [port2],
}, [port2]);
}, 0);
};
}
// @ts-expect-error we're doing pretty crazy stuff.
return target[property];
},
});
if (typeof options === "string") {
options = { name: options };
}
options.type = "module";
this.backingWorker = new Worker(url, options);
}
onerror(ev: ErrorEvent) {
console.error(ev);
}
close() {
this.port.close();
this.backingWorker.terminate();
this.backingWorker.onmessage = null;
this.backingWorker.onerror = null;
}
}
if (typeof SharedWorker === "undefined") {
globalThis.SharedWorker = SharedWorkerShim;
}
type PublicMethodKeys<T> = {
[K in keyof T]: T[K] extends (...args: infer P) => infer R
? (P extends unknown[] ? (R extends unknown ? K : never) : never)
: never;
}[keyof T];
// Maps each method of T to its argument types as a tuple.
type MethodArguments<T> = {
[K in keyof T]: T[K] extends (...args: infer P) => unknown ? P : never;
};
// Maps each method of T to its return type.
type MethodReturnTypes<T> = {
[K in keyof T]: T[K] extends (...args: unknown[]) => infer R ? R : never;
};
type WorkerMessage<T extends BfSharedWorker<T>> = {
type: PublicMethodKeys<T>;
args: MethodArguments<T>[PublicMethodKeys<T>];
};
export enum BfSharedWorkerTypes {
"MAIN_WORKER" = "MAIN_WORKER",
"MAIN_THREAD" = "MAIN_THREAD",
"IMPORTED_WORKER" = "IMPORTED_WORKER",
"IMPORTED_MAIN_THREAD" = "IMPORTED_MAIN_THREAD",
}
export default class BfSharedWorker<T extends BfSharedWorker<T>> {
[key: string | symbol]: unknown;
protected mainUrl = "/resources/workers/bf-shared-worker.ts"; // my server and my import map both resolve this correctly.
protected worker?: SharedWorker;
protected supportsRunningAsMainThread = false;
protected supportsRunningAsWorker = true;
protected workerName = this.constructor.name;
private get workerType() {
const isWorker = typeof globalThis.window === "undefined";
const isMainRunningInMainThread = typeof globalThis.window !== "undefined";
const isMain = import.meta.main;
const isImported = !import.meta.main;
if (isMain && isMainRunningInMainThread) {
return BfSharedWorkerTypes.MAIN_THREAD;
}
if (isMain && !isMainRunningInMainThread) {
return BfSharedWorkerTypes.MAIN_WORKER;
}
if (isImported && isWorker) {
return BfSharedWorkerTypes.IMPORTED_WORKER;
}
return BfSharedWorkerTypes.IMPORTED_MAIN_THREAD;
}
constructor(...args: Array<unknown>) {
console.log("Constructor", this.workerType);
switch (this.workerType) {
case BfSharedWorkerTypes.IMPORTED_MAIN_THREAD:
case BfSharedWorkerTypes.IMPORTED_WORKER:
return this.initAsImported(...args);
case BfSharedWorkerTypes.MAIN_THREAD:
return this.initAsMainThread(...args);
case BfSharedWorkerTypes.MAIN_WORKER:
return this.initAsMainWorker(...args);
}
}
private initAsMainThread(...args: Array<unknown>) {
if (!this.supportsRunningAsMainThread) {
throw new Error(
"This worker does not support running as main thread",
);
}
return this;
}
private initAsImported(...args: Array<unknown>) {
const url = new URL(import.meta.resolve(this.mainUrl));
this.worker = new SharedWorker(url, this.workerName);
// Start the port for communication with the SharedWorker
console.log("going to start");
this.worker.port.start();
const thisSelf: T = this as unknown as T; // Capture the actual type, including subclasses
return new Proxy(thisSelf, {
get(target, prop, receiver) {
if (typeof prop !== "string") {
return Reflect.get(target, prop, receiver);
}
const origMethod = target[prop];
if (typeof origMethod === "function" && prop in target) {
// Intercept method calls
return (...methodArgs: unknown[]) => {
// Redirect the call through sendMessage
// Type safety for arguments is limited
return thisSelf.sendMessage(
prop as PublicMethodKeys<T>,
methodArgs as MethodArguments<T>[PublicMethodKeys<T>],
);
};
}
return Reflect.get(target, prop, receiver);
},
});
}
private initAsMainWorker(...args: Array<unknown>) {
console.log("initAsMainWorker", this.workerType);
globalThis.addEventListener("connect", (e) => {
this.onConnect((e as MessageEvent).ports as MessagePort[]);
});
globalThis.addEventListener("message", (e) => {
if (e.data.type === "onconnect") {
this.onConnect(e.data.ports as MessagePort[]);
} else {
console.log("worker message", e);
}
});
return this;
}
private onConnect(ports: MessagePort[]) {
console.log("onConnect");
const port = ports[0];
port.onmessage = this.receiveMessage.bind(this);
}
private isFunctionProperty<K extends keyof BfSharedWorker<T>>(
prop: K
): this is BfSharedWorker<T> & Record<K, (...args: unknown[]) => unknown> {
return typeof this[prop] === 'function';
}
private async receiveMessage(
e: MessageEvent<{type: PublicMethodKeys<T>, args: MethodArguments<T>[PublicMethodKeys<T>], port: MessagePort}>,
) {
const { type, args, port } = e.data;
try {
if (type in this && this.isFunctionProperty(type)) {
// TypeScript should now recognize 'this[type]' as a function
const returnable = await this[type](...args);
port.postMessage(returnable);
}
throw new Error(`Method ${String(type)} not found`);
} catch (error) {
port.postMessage({ error });
}
}
private sendMessage<K extends PublicMethodKeys<this>>(
type: K,
args: MethodArguments<this>[K],
transferrables: Transferable[] = [],
): Promise<MethodReturnTypes<this>[K]> {
// Implementation of sendMessage to handle the proxied method calls
return new Promise((resolve, reject) => {
const { port1, port2 } = new MessageChannel();
const txfr = [
port2,
...transferrables,
];
port1.onmessage = (e) => {
if (e.data.error) {
reject(e.data.error);
} else {
resolve(e.data);
}
};
if (this.worker) {
console.log("sending message", type, args);
return this.worker.port.postMessage({ port: port2, type, args }, txfr);
}
reject(new Error("Worker not initialized"));
});
}
// these would normally be extended in a new worker
ping() {
console.log("ping");
return "pong";
}
adder(a: number, b: number) {
return a + b;
}
}
if (import.meta.main) {
const worker = new BfSharedWorker();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment