Skip to content

Instantly share code, notes, and snippets.

@mfbx9da4
Last active January 2, 2022 14:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mfbx9da4/9065154558bb2bbfc46864d7bc612391 to your computer and use it in GitHub Desktop.
Save mfbx9da4/9065154558bb2bbfc46864d7bc612391 to your computer and use it in GitHub Desktop.
Remote procedure calls (RPCs) using BroadcastChannel in deno
// ---- START IMPORTS ----
export type AssertionExtra = (Record<string, unknown> & { name?: ErrorCode }) | ErrorCode
export function assert(predicate: any, message: string, extra: AssertionExtra = {}): asserts predicate {
if (!predicate) {
extra = typeof extra === 'string' ? { name: extra } : extra
if (!('name' in extra)) {
extra.name = ErrorCode.AssertionError
}
throw new AssertionError(message, extra)
}
}
export class AssertionError extends Error {
constructor(message: string, extra: any = {}) {
super(message)
this.name = 'AssertionError'
Object.assign(this, extra)
}
}
export class DeferredPromise<T = void, E = any> {
isPending = true
isFulfilled = false
isRejected = false
reject: (x: E) => void = () => {}
resolve: (x: T | PromiseLike<T>) => void = () => {}
promise: Promise<T>
constructor() {
this.promise = new Promise<T>((res, rej) => {
this.resolve = (...args) => {
this.isPending = false
this.isFulfilled = true
res(...args)
}
this.reject = (...args) => {
this.isPending = false
this.isRejected = true
rej(...args)
}
})
}
}
// ---- END IMPORTS ----
const uuid = () => crypto.randomUUID()
export type MethodNames<T> = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T]
export type PickMethods<T> = Pick<T, MethodNames<T>>
type PromisifyMethods<T> = { [K in keyof T]: Promisify<T[K]> }
type Promisify<T> = T extends (...args: infer Args) => infer Ret
? Ret extends Promise<unknown>
? (...args: Args) => Ret
: (...args: Args) => Promise<Ret>
: never
type Payload<T> = { requestId: string; fn: keyof T; args: unknown[] }
export class BroadcastMethods<T extends Record<string, Function>> {
timeout = 5000
constructor(opts?: { timeout?: number }) {
if (opts?.timeout) {
this.timeout = opts.timeout
}
}
expose(channelId: string, methods: T) {
assert(channelId, 'missing channelId')
const chan = new BroadcastChannel(channelId)
chan.onmessage = async e => {
assert(e.data, `missing data for "${channelId}"`)
const data = e.data as Payload<T>
const requestId = data.requestId
const fn = data.fn
const args = data.args
assert(requestId && typeof requestId === 'string', `missing requestId for "${channelId}"`)
assert(typeof fn === 'string', `missing fn for "${channelId}"`)
assert(Array.isArray(args), `missing args for "${channelId}"`)
assert(methods[fn], `missing method "${fn}" for "${channelId}"`)
try {
const result = await methods[fn](...args)
chan.postMessage({ result, requestId })
} catch (error) {
chan.postMessage({ error, requestId })
}
}
return chan
}
get(channelId: string): PromisifyMethods<T> {
const exec = (fn: keyof PromisifyMethods<T>, args: unknown[]) => {
assert(channelId, 'missing channelId')
const requestId = uuid()
const chan = new BroadcastChannel(channelId)
const promise = new DeferredPromise<T[typeof fn]>()
chan.onmessage = e => {
if (!promise.isPending) return chan.close()
if (requestId === e.data?.requestId) {
chan.close()
if ('error' in e.data) {
promise.reject(e.data.error)
} else {
promise.resolve(e.data.result)
}
}
}
setTimeout(() => {
if (promise.isPending) {
chan.close()
promise.reject(new Error(`Timeout for "${channelId}" method: "${fn}"`))
}
}, this.timeout)
const payload: Payload<T> = { requestId, fn, args }
chan.postMessage(payload)
return promise.promise
}
return new Proxy(
{},
{
get: (_, fn) => {
return (...args: unknown[]) => exec(fn as keyof PromisifyMethods<T>, args)
},
},
) as PromisifyMethods<T>
}
}
export async function usage() {
// Set up some in memory state.
// We want to make this state globally available to all deno instances.
let count = 0
const counterMethods = {
increment: () => (count += 1),
decrement: () => (count -= 1),
currentCount: () => count,
}
// This ID should be unique across all deno instances.
const counterId = 'some-uuid'
const globalCounters = new BroadcastMethods<typeof counterMethods>()
// We expose the counter methods to all deno instances for this specific `counterId`
globalCounters.expose(counterId, counterMethods)
// We can now get the current state of the counter from it's origin deno instance.
// If the counter is on this deno instance, it will short circuit and return the current state.
// If the counter has not yet been initialized or the origin deno instance has died, this will
// throw with a timeout error.
const counterInstance = globalCounters.get(counterId)
console.log('currentCount', await counterInstance.currentCount())
// We can also increment and decrement the counter on it's origin deno instance.
await counterInstance.increment()
await counterInstance.increment()
await counterInstance.decrement()
console.log('currentCount2', await counterInstance.currentCount())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment