Skip to content

Instantly share code, notes, and snippets.

@TimonLukas
Last active August 13, 2023 14:40
Show Gist options
  • Save TimonLukas/c0bb7e8f9bde9d3d74d6b776b5abdb72 to your computer and use it in GitHub Desktop.
Save TimonLukas/c0bb7e8f9bde9d3d74d6b776b5abdb72 to your computer and use it in GitHub Desktop.
TRPC custom message encoding in WebSocket w/ ID negotiation
export class WsClient {
#socket: WebSocket
#listeners: {
open: ((event: Event) => any)[]
close: ((event: CloseEvent) => any)[]
error: ((event: Event) => any)[]
message: ((event: MessageEvent) => any)[]
} = {
open: [],
close: [],
error: [],
message: [],
}
#methodsToMethodIds: Record<string, number> = {}
#methodIdsToMethods: Record<number, string> = {}
constructor(url: string) {
this.#socket = new WebSocket(url)
this.#socket.addEventListener("open", (e) => this.#listeners.open.forEach((callback) => callback(e)))
this.#socket.addEventListener("close", (e) => this.#listeners.close.forEach((callback) => callback(e)))
this.#socket.addEventListener("error", (e) => this.#listeners.error.forEach((callback) => callback(e)))
this.#socket.addEventListener("message", (e) => {
const data = JSON.parse(e.data)
if ("type" in data && typeof data.type === "string") {
switch (data.type) {
case "update-method-to-method-ids":
const { forwardMap, backwardMap } = data
this.#methodsToMethodIds = forwardMap
this.#methodIdsToMethods = backwardMap
console.log("Got new method IDs map with keys:", Object.keys(forwardMap))
break
default:
throw new Error(`Unknown message type=${data.type} for message ${e.data}`)
}
}
if (!Array.isArray(data)) {
this.#listeners.message.forEach((callback) => callback(e))
return
}
const [id, result] = data
const event = { ...e, data: JSON.stringify({ id, result })}
this.#listeners.message.forEach((callback) => callback(event))
})
}
addEventListener(event: string, callback: (e: Event) => any): this {
switch (event) {
case "open":
this.#listeners.open.push(callback)
break
case "close":
this.#listeners.close.push(callback)
break
case "error":
this.#listeners.error.push(callback)
break
case "message":
this.#listeners.message.push(callback)
break
default:
throw new Error(`Unexpected event type: '${event}'`)
}
return this
}
send(data: string): void {
const parsedData = JSON.parse(data)
console.log("sending", parsedData)
if (typeof parsedData.id !== "number" || typeof parsedData.method !== "string") {
this.#socket.send(data)
return
}
if (parsedData.method in this.#methodsToMethodIds) {
const serialized = typeof parsedData.params === "undefined" ? JSON.stringify([parsedData.id, this.#methodsToMethodIds[parsedData.method]]) : JSON.stringify([parsedData.id, this.#methodsToMethodIds[parsedData.method], parsedData.params])
console.log(`Send serialized to: ${serialized}`)
this.#socket.send(serialized)
return
} else {
console.log(`Couldn't find method '${parsedData.method}' in keys:`, Object.keys(this.#methodsToMethodIds))
}
let value = ""
if (typeof parsedData.params === "undefined") {
value = JSON.stringify([parsedData.id, parsedData.method])
} else {
value = JSON.stringify([parsedData.id, parsedData.method, parsedData.params])
}
this.#socket.send(value)
console.log("Sent value:", value)
}
close(code?: number | undefined, reason?: string | undefined): void {
this.#socket.close(code, reason)
}
}
import { IncomingMessage, Server } from "http"
import { WebSocket, WebSocketServer, RawData } from "ws";
const canHandleNonObjects = Symbol()
export class WsServer {
#server: WebSocketServer
#listeners: {
connection: ((socket: WebSocket, request: IncomingMessage) => Promise<any>)[]
} = {
connection: [],
}
#methodsToMethodIds: Record<string, number> = {}
#methodIdsToMethods: Record<number, string> = {}
#lastId = 0
constructor(options: {
server: Server
}) {
this.#server = new WebSocketServer(options)
this.#server.on("connection", (socket, ...args) => {
const mySocket = new WsSocket(socket)
this.#sendMethodToMethodIdsMap(socket)
const messageHandler = (data: RawData, isBinary: boolean) => {
// eslint-disable-next-line @typescript-eslint/no-base-to-string
const results = JSON.parse(data.toString())
if (Array.isArray(results)) {
const [id, method, params] = results
if (typeof method !== "undefined") {
let reformatted = ""
if (typeof method === "string") {
reformatted = JSON.stringify({id, method, params})
this.#insertMethodToMethodId(method)
this.#sendMethodToMethodIdsMap(socket)
} else {
reformatted = JSON.stringify({id, params, method: this.#methodIdsToMethods[method]})
console.log("Got request =", reformatted)
}
mySocket.forwardMessage(reformatted, false, messageHandler)
}
return
}
}
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
messageHandler[canHandleNonObjects] = true
mySocket.on("message", messageHandler)
this.#listeners.connection.forEach((callback) => callback(mySocket as unknown as WebSocket, ...args))
})
}
on(event: string, callback: (...args: any[]) => any): this {
switch(event) {
case "connection":
this.#listeners.connection.push(callback)
break
default:
throw new Error(`Unexpected event type: '${event}'`)
}
return this
}
#nextMethodId(): number {
return this.#lastId++
}
#insertMethodToMethodId(method: string): number {
if (!(method in this.#methodsToMethodIds)) {
this.#methodsToMethodIds[method] = this.#nextMethodId()
this.#methodIdsToMethods[this.#methodsToMethodIds[method]] = method
}
return this.#methodsToMethodIds[method]
}
#sendMethodToMethodIdsMap(socket?: WebSocket): void {
if (Object.keys(this.#methodsToMethodIds).length === 0) {
return
}
const message = JSON.stringify({
type: "update-method-to-method-ids",
forwardMap: this.#methodsToMethodIds,
backwardMap: this.#methodIdsToMethods,
})
console.log("Sending method ID maps:", message)
if (typeof socket === "undefined") {
this.#server.clients.forEach((client) => client.send(message))
return
}
socket.send(message)
}
}
class WsSocket {
#socket: WebSocket
#listeners: {
message: ((message: RawData | string, isBinary: boolean) => any)[]
error: ((error: Error) => any)[]
close: ((code: number, reason: Buffer) => any)[]
open: (() => any)[]
} = {
message: [],
error: [],
close: [],
open: [],
}
constructor(socket: WebSocket) {
this.#socket = socket
this.#socket.on("message", (data: RawData, isBinary: boolean) => {
this.#listeners.message.forEach((callback) => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
if (callback[canHandleNonObjects]) {
// eslint-disable-next-line @typescript-eslint/no-base-to-string
callback(data.toString(), isBinary)
}
})
})
this.#socket.on("error", (error) => {
this.#listeners.error.forEach((callback) => callback(error))
})
this.#socket.on("close", (code: number, reason: Buffer) => {
this.#listeners.close.forEach((callback) => callback(code, reason))
})
this.#socket.on("open", () => {
this.#listeners.open.forEach((callback) => callback())
})
}
on(event: string, callback: (...args: any[]) => any): this {
switch (event) {
case "message":
this.#listeners.message.push(callback)
break
case "error":
this.#listeners.error.push(callback)
break
case "close":
this.#listeners.close.push(callback)
break
case "open":
this.#listeners.open.push(callback)
break
default:
throw new Error(`Unexpected event type: '${event}'`)
}
return this
}
off(event: string, callback: (...args: any[]) => any): this {
switch (event) {
case "close":
const index = this.#listeners.close.indexOf(callback)
if (index !== -1) {
this.#listeners.close.splice(index, 1)
}
break
default:
throw new Error(`Unexpected event type: '${event}'`)
}
return this
}
once(event: string, callback: (...args: any[]) => any): this {
const handleCallback = (...args: any[]) => {
callback(...args)
this.off(event, handleCallback)
}
this.on(event, handleCallback)
return this
}
send(
data: BufferLike,
options: { mask?: boolean | undefined; binary?: boolean | undefined; compress?: boolean | undefined; fin?: boolean | undefined },
cb?: (err?: Error) => void,
) {
// eslint-disable-next-line @typescript-eslint/no-base-to-string
const parsedData = JSON.parse(data.toString())
console.log("Server will send:", parsedData)
const reformatted = JSON.stringify([parsedData.id, parsedData.result])
return this.#socket.send(reformatted, options, cb)
}
forwardMessage(message: RawData | string, isBinary: boolean, messageHandler: (message: RawData, isBinary: boolean) => any): void {
this.#listeners.message.forEach((callback) => {
if (callback !== messageHandler) {
callback(message, isBinary)
}
})
}
}
type BufferLike =
| string
| Buffer
| DataView
| number
| ArrayBufferView
| Uint8Array
| ArrayBuffer
| SharedArrayBuffer
| { valueOf(): ArrayBuffer }
| { valueOf(): SharedArrayBuffer }
| { valueOf(): Uint8Array }
| { valueOf(): string }
| { [Symbol.toPrimitive](hint: string): string };
@TimonLukas
Copy link
Author

TimonLukas commented Aug 13, 2023

This is a PoC meant to improve throughput in a WebSocket scenario by reducing message size. It's implemented through a custom WebSocket/WebSocketServer that each implement only the API strictly necessary to get the PoC working. They transform data sent through send and received through .on("message", (message) => { /* ... */ } like this:

{"id":3,"method":"subscription.stop"}  // 37 characters

is transformed into:

[3,1]  // 5 characters

and transformed back before tRPC ever gets to see it. The ID mapping is not static, but updated at runtime as messages come in (meaning that IDs might have changed after a server restart). Updates of the ID mapping are sent to all clients, who can either still respond with normal object responses or use the array format to reduce size.

Usage:

  • Pass WsClient as the WebSocket option to createWSClient
    const wsClient = createWSClient({
      url: `ws://localhost:2022`,
      // eslint-disable-next-line @typescript-eslint/ban-ts-comment
      // @ts-ignore
      WebSocket: WsClient as unknown as ws.WebSocket,
    });
    const trpc = createTRPCProxyClient<AppRouter>({
      links: [
        wsLink({
          client: wsClient,
        }),
      ],
    });
  • Pass WsServer instance as the wss option to applyWSSHandler
    // http server
    const { server, listen } = createHTTPServer({
      router: appRouter,
      createContext,
    });
    
    // ws server
    const wss = new WsServer({ server }) as unknown as WebSocketServer;
    applyWSSHandler<AppRouter>({
      wss,
      router: appRouter,
      createContext,
    });

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment