Skip to content

Instantly share code, notes, and snippets.

@jabczyk
Created October 14, 2021 09:51
Show Gist options
  • Save jabczyk/29e26bc8cceea9939a7ec92d7c9b1717 to your computer and use it in GitHub Desktop.
Save jabczyk/29e26bc8cceea9939a7ec92d7c9b1717 to your computer and use it in GitHub Desktop.
Tiny wrapper for uWebSockets.js
import {
App,
TemplatedApp,
WebSocket,
WebSocketBehavior,
SHARED_COMPRESSOR,
// eslint-disable-next-line camelcase
us_listen_socket_close
} from 'uWebSockets.js'
import { v4 as uuidv4 } from 'uuid'
import { EventEmitter } from 'events'
import assert from 'assert'
const textDecoder = new TextDecoder('utf-8')
const sleep = (ms: number) => new Promise((res) => setTimeout(res, ms))
export interface ServerOptions {
port?: number
wsBehavior?: WebSocketBehavior
}
export type WSEvent<T = any> = [string, T?]
export interface MessageCtx<T = any> {
op: string
payload?: T
ws: WebSocket
reply: (wsEvent: WSEvent) => Promise<boolean>
}
export class WSServer extends EventEmitter {
private app: TemplatedApp
private sockets: Map<string, WebSocket> = new Map()
private listenSocket: any
onlineCountInstance = 0
constructor(private options: ServerOptions) {
super()
}
create() {
this.app = App()
.ws('/*', {
idleTimeout: 32,
maxPayloadLength: 1024 ** 2,
compression: SHARED_COMPRESSOR,
...this.options.wsBehavior,
upgrade: (res, req, context) => {
const id = uuidv4()
res.upgrade(
{ id },
req.getHeader('sec-websocket-key'),
req.getHeader('sec-websocket-protocol'),
req.getHeader('sec-websocket-extensions'),
context
)
},
open: (ws) => {
this.sockets[ws.id] = ws
this.onlineCountInstance++
this.emit('connect', ws)
},
close: (ws) => {
this.sockets.delete(ws.id)
this.onlineCountInstance--
this.emit('disconnect', ws)
},
message: (ws, rawMessage, isBinary) => {
const event = this._parseRawMessage(rawMessage)
if (!event) return
const [op, payload] = event
const reply = async (wsEvent: WSEvent) => {
const data = JSON.stringify(wsEvent)
return this._send(ws, data, isBinary)
}
const context: MessageCtx = { op, payload, ws, reply }
if (!['connect', 'disconnect', 'message'].includes(op)) {
this.emit(op, context)
}
this.emit('message', context)
}
})
.any('/*', (res) => {
res
.writeHeader('Content-Type', 'application/json')
.end(JSON.stringify({ name: 'uws-server' }))
})
.listen(this.options.port, (token) => {
if (token) this.listenSocket = token
})
return this
}
broadcast(wsEvent: WSEvent) {
const message = JSON.stringify(wsEvent)
this.app.publish('broadcast', message, false, true)
}
shutdown() {
if (!this.listenSocket) return
us_listen_socket_close(this.listenSocket)
this.listenSocket = null
}
async _send(ws: WebSocket, data: any, isBinary = false) {
if (ws.getBufferedAmount() > 1024 ** 2) {
await sleep(50)
return this._send(ws, data, isBinary)
}
return ws.send(data, isBinary, true)
}
_parseRawMessage(rawMessage: ArrayBuffer): WSEvent | false {
const asText = textDecoder.decode(rawMessage)
try {
const parsed: WSEvent = JSON.parse(asText)
assert(
Array.isArray(parsed) &&
parsed.length <= 2 &&
typeof parsed[0] === 'string',
'Invalid message format'
)
return parsed
} catch (err) {
return false
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment