Last active
August 13, 2023 14:40
-
-
Save TimonLukas/c0bb7e8f9bde9d3d74d6b776b5abdb72 to your computer and use it in GitHub Desktop.
TRPC custom message encoding in WebSocket w/ ID negotiation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export class WsClient { | |
#socket: WebSocket | |
#listeners: { | |
open: ((event: Event) => any)[] | |
close: ((event: CloseEvent) => any)[] | |
error: ((event: Event) => any)[] | |
message: ((event: MessageEvent) => any)[] | |
} = { | |
open: [], | |
close: [], | |
error: [], | |
message: [], | |
} | |
#methodsToMethodIds: Record<string, number> = {} | |
#methodIdsToMethods: Record<number, string> = {} | |
constructor(url: string) { | |
this.#socket = new WebSocket(url) | |
this.#socket.addEventListener("open", (e) => this.#listeners.open.forEach((callback) => callback(e))) | |
this.#socket.addEventListener("close", (e) => this.#listeners.close.forEach((callback) => callback(e))) | |
this.#socket.addEventListener("error", (e) => this.#listeners.error.forEach((callback) => callback(e))) | |
this.#socket.addEventListener("message", (e) => { | |
const data = JSON.parse(e.data) | |
if ("type" in data && typeof data.type === "string") { | |
switch (data.type) { | |
case "update-method-to-method-ids": | |
const { forwardMap, backwardMap } = data | |
this.#methodsToMethodIds = forwardMap | |
this.#methodIdsToMethods = backwardMap | |
console.log("Got new method IDs map with keys:", Object.keys(forwardMap)) | |
break | |
default: | |
throw new Error(`Unknown message type=${data.type} for message ${e.data}`) | |
} | |
} | |
if (!Array.isArray(data)) { | |
this.#listeners.message.forEach((callback) => callback(e)) | |
return | |
} | |
const [id, result] = data | |
const event = { ...e, data: JSON.stringify({ id, result })} | |
this.#listeners.message.forEach((callback) => callback(event)) | |
}) | |
} | |
addEventListener(event: string, callback: (e: Event) => any): this { | |
switch (event) { | |
case "open": | |
this.#listeners.open.push(callback) | |
break | |
case "close": | |
this.#listeners.close.push(callback) | |
break | |
case "error": | |
this.#listeners.error.push(callback) | |
break | |
case "message": | |
this.#listeners.message.push(callback) | |
break | |
default: | |
throw new Error(`Unexpected event type: '${event}'`) | |
} | |
return this | |
} | |
send(data: string): void { | |
const parsedData = JSON.parse(data) | |
console.log("sending", parsedData) | |
if (typeof parsedData.id !== "number" || typeof parsedData.method !== "string") { | |
this.#socket.send(data) | |
return | |
} | |
if (parsedData.method in this.#methodsToMethodIds) { | |
const serialized = typeof parsedData.params === "undefined" ? JSON.stringify([parsedData.id, this.#methodsToMethodIds[parsedData.method]]) : JSON.stringify([parsedData.id, this.#methodsToMethodIds[parsedData.method], parsedData.params]) | |
console.log(`Send serialized to: ${serialized}`) | |
this.#socket.send(serialized) | |
return | |
} else { | |
console.log(`Couldn't find method '${parsedData.method}' in keys:`, Object.keys(this.#methodsToMethodIds)) | |
} | |
let value = "" | |
if (typeof parsedData.params === "undefined") { | |
value = JSON.stringify([parsedData.id, parsedData.method]) | |
} else { | |
value = JSON.stringify([parsedData.id, parsedData.method, parsedData.params]) | |
} | |
this.#socket.send(value) | |
console.log("Sent value:", value) | |
} | |
close(code?: number | undefined, reason?: string | undefined): void { | |
this.#socket.close(code, reason) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { IncomingMessage, Server } from "http" | |
import { WebSocket, WebSocketServer, RawData } from "ws"; | |
const canHandleNonObjects = Symbol() | |
export class WsServer { | |
#server: WebSocketServer | |
#listeners: { | |
connection: ((socket: WebSocket, request: IncomingMessage) => Promise<any>)[] | |
} = { | |
connection: [], | |
} | |
#methodsToMethodIds: Record<string, number> = {} | |
#methodIdsToMethods: Record<number, string> = {} | |
#lastId = 0 | |
constructor(options: { | |
server: Server | |
}) { | |
this.#server = new WebSocketServer(options) | |
this.#server.on("connection", (socket, ...args) => { | |
const mySocket = new WsSocket(socket) | |
this.#sendMethodToMethodIdsMap(socket) | |
const messageHandler = (data: RawData, isBinary: boolean) => { | |
// eslint-disable-next-line @typescript-eslint/no-base-to-string | |
const results = JSON.parse(data.toString()) | |
if (Array.isArray(results)) { | |
const [id, method, params] = results | |
if (typeof method !== "undefined") { | |
let reformatted = "" | |
if (typeof method === "string") { | |
reformatted = JSON.stringify({id, method, params}) | |
this.#insertMethodToMethodId(method) | |
this.#sendMethodToMethodIdsMap(socket) | |
} else { | |
reformatted = JSON.stringify({id, params, method: this.#methodIdsToMethods[method]}) | |
console.log("Got request =", reformatted) | |
} | |
mySocket.forwardMessage(reformatted, false, messageHandler) | |
} | |
return | |
} | |
} | |
// eslint-disable-next-line @typescript-eslint/ban-ts-comment | |
// @ts-ignore | |
messageHandler[canHandleNonObjects] = true | |
mySocket.on("message", messageHandler) | |
this.#listeners.connection.forEach((callback) => callback(mySocket as unknown as WebSocket, ...args)) | |
}) | |
} | |
on(event: string, callback: (...args: any[]) => any): this { | |
switch(event) { | |
case "connection": | |
this.#listeners.connection.push(callback) | |
break | |
default: | |
throw new Error(`Unexpected event type: '${event}'`) | |
} | |
return this | |
} | |
#nextMethodId(): number { | |
return this.#lastId++ | |
} | |
#insertMethodToMethodId(method: string): number { | |
if (!(method in this.#methodsToMethodIds)) { | |
this.#methodsToMethodIds[method] = this.#nextMethodId() | |
this.#methodIdsToMethods[this.#methodsToMethodIds[method]] = method | |
} | |
return this.#methodsToMethodIds[method] | |
} | |
#sendMethodToMethodIdsMap(socket?: WebSocket): void { | |
if (Object.keys(this.#methodsToMethodIds).length === 0) { | |
return | |
} | |
const message = JSON.stringify({ | |
type: "update-method-to-method-ids", | |
forwardMap: this.#methodsToMethodIds, | |
backwardMap: this.#methodIdsToMethods, | |
}) | |
console.log("Sending method ID maps:", message) | |
if (typeof socket === "undefined") { | |
this.#server.clients.forEach((client) => client.send(message)) | |
return | |
} | |
socket.send(message) | |
} | |
} | |
class WsSocket { | |
#socket: WebSocket | |
#listeners: { | |
message: ((message: RawData | string, isBinary: boolean) => any)[] | |
error: ((error: Error) => any)[] | |
close: ((code: number, reason: Buffer) => any)[] | |
open: (() => any)[] | |
} = { | |
message: [], | |
error: [], | |
close: [], | |
open: [], | |
} | |
constructor(socket: WebSocket) { | |
this.#socket = socket | |
this.#socket.on("message", (data: RawData, isBinary: boolean) => { | |
this.#listeners.message.forEach((callback) => { | |
// eslint-disable-next-line @typescript-eslint/ban-ts-comment | |
// @ts-ignore | |
if (callback[canHandleNonObjects]) { | |
// eslint-disable-next-line @typescript-eslint/no-base-to-string | |
callback(data.toString(), isBinary) | |
} | |
}) | |
}) | |
this.#socket.on("error", (error) => { | |
this.#listeners.error.forEach((callback) => callback(error)) | |
}) | |
this.#socket.on("close", (code: number, reason: Buffer) => { | |
this.#listeners.close.forEach((callback) => callback(code, reason)) | |
}) | |
this.#socket.on("open", () => { | |
this.#listeners.open.forEach((callback) => callback()) | |
}) | |
} | |
on(event: string, callback: (...args: any[]) => any): this { | |
switch (event) { | |
case "message": | |
this.#listeners.message.push(callback) | |
break | |
case "error": | |
this.#listeners.error.push(callback) | |
break | |
case "close": | |
this.#listeners.close.push(callback) | |
break | |
case "open": | |
this.#listeners.open.push(callback) | |
break | |
default: | |
throw new Error(`Unexpected event type: '${event}'`) | |
} | |
return this | |
} | |
off(event: string, callback: (...args: any[]) => any): this { | |
switch (event) { | |
case "close": | |
const index = this.#listeners.close.indexOf(callback) | |
if (index !== -1) { | |
this.#listeners.close.splice(index, 1) | |
} | |
break | |
default: | |
throw new Error(`Unexpected event type: '${event}'`) | |
} | |
return this | |
} | |
once(event: string, callback: (...args: any[]) => any): this { | |
const handleCallback = (...args: any[]) => { | |
callback(...args) | |
this.off(event, handleCallback) | |
} | |
this.on(event, handleCallback) | |
return this | |
} | |
send( | |
data: BufferLike, | |
options: { mask?: boolean | undefined; binary?: boolean | undefined; compress?: boolean | undefined; fin?: boolean | undefined }, | |
cb?: (err?: Error) => void, | |
) { | |
// eslint-disable-next-line @typescript-eslint/no-base-to-string | |
const parsedData = JSON.parse(data.toString()) | |
console.log("Server will send:", parsedData) | |
const reformatted = JSON.stringify([parsedData.id, parsedData.result]) | |
return this.#socket.send(reformatted, options, cb) | |
} | |
forwardMessage(message: RawData | string, isBinary: boolean, messageHandler: (message: RawData, isBinary: boolean) => any): void { | |
this.#listeners.message.forEach((callback) => { | |
if (callback !== messageHandler) { | |
callback(message, isBinary) | |
} | |
}) | |
} | |
} | |
type BufferLike = | |
| string | |
| Buffer | |
| DataView | |
| number | |
| ArrayBufferView | |
| Uint8Array | |
| ArrayBuffer | |
| SharedArrayBuffer | |
| { valueOf(): ArrayBuffer } | |
| { valueOf(): SharedArrayBuffer } | |
| { valueOf(): Uint8Array } | |
| { valueOf(): string } | |
| { [Symbol.toPrimitive](hint: string): string }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a PoC meant to improve throughput in a WebSocket scenario by reducing message size. It's implemented through a custom
WebSocket
/WebSocketServer
that each implement only the API strictly necessary to get the PoC working. They transform data sent throughsend
and received through.on("message", (message) => { /* ... */ }
like this:is transformed into:
and transformed back before tRPC ever gets to see it. The ID mapping is not static, but updated at runtime as messages come in (meaning that IDs might have changed after a server restart). Updates of the ID mapping are sent to all clients, who can either still respond with normal object responses or use the array format to reduce size.
Usage:
WsClient
as theWebSocket
option tocreateWSClient
WsServer
instance as thewss
option toapplyWSSHandler