Skip to content

Instantly share code, notes, and snippets.

@seia-soto
Last active April 26, 2023 07:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seia-soto/a8e1958bdf4f688d071c88924cc1a6ff to your computer and use it in GitHub Desktop.
Save seia-soto/a8e1958bdf4f688d071c88924cc1a6ff to your computer and use it in GitHub Desktop.
Fastify WebSocket (with Early-termination of WebSocket before upgrade in process of request handler)
// (typeRoot) types/fastify.d.ts
/* eslint-disable @typescript-eslint/consistent-type-definitions */
import 'fastify';
import {type WebSocketServer} from 'ws';
import {type WebSocketWithConnection} from '../src/modules/ws.js';
declare module 'fastify' {
interface FastifyInstance {
wss: WebSocketServer;
}
interface FastifyRequest {
session: TokenPayload;
resolveWebSocket?: () => Promise<WebSocketWithConnection>;
}
interface RouteOptions {
useWebSocket?: boolean;
wsHandler?: (wss: WebSocketServer, ws: WebSocketWithConnection, request: FastifyRequest) => unknown;
}
}
// (typeRoot) types/http.d.ts
/* eslint-disable @typescript-eslint/consistent-type-definitions */
import type * as http from 'http';
import type internal from 'stream';
import {type kWsPin} from '../src/modules/ws.js';
declare module 'http' {
export interface IncomingMessage extends IncomingMessage {
[kWsPin]?: {
isResolved?: true;
socket: internal.Duplex;
head: Buffer;
};
}
}
/**
* MIT License Copyright (c) 2023 HoJeong Go
*
* This plugin is the forked version of @fastify/websocket, and adds the following features:
* - Determine if server need to accept upgrade request before processing it
*
* To use this module, you need to register it to your instance.
* Registering this module will enable you to use `useWebSocket` and `wsHandler` optional properties in `routingOption` globally.
* You don't need to set `useWebSocket` if you set `wsHandler`.
*
* To accept `upgrade` request, call `request.resolveWebSocket` function.
* Note that this function will not available if the request is not upgrade request for WebSocket.
* You need to check the existence of function before using it.
*
* Now the WebSocket connection is established and you can process the WebSocket via the return value of `resolveWebSocket` and via `wsHandler` function.
*
* To drop the WebSocket connection, just return the value before calling `request.resolveWebSocket`.
* As `resolveWebSocket` defer the reply hijacking from the Fastify instance, you'll able to use `reply.send` or return value before calling the function.
* By doing so, WebSocket client will know 101 switching protocol was not returned and the server will closed the connection by returning response.
*/
import {type FastifyPluginAsyncTypebox} from '@fastify/type-provider-typebox';
import {type FastifyReply, type FastifyRequest, type RouteOptions} from 'fastify';
import fastifyPlugin from 'fastify-plugin';
import {ServerResponse} from 'http';
import type internal from 'stream';
import {WebSocketServer, createWebSocketStream, type WebSocket} from 'ws';
export type WebSocketWithConnection = {
socket: WebSocket;
} & internal.Duplex;
export const kWsPin = Symbol('ar1s-ws-pin');
const webSocketPlugin: FastifyPluginAsyncTypebox = async (fastify, _opts) => {
const wss = new WebSocketServer({noServer: true});
const resolveWebSocket = async (request: FastifyRequest, reply: FastifyReply, wsHandler?: RouteOptions['wsHandler']) => new Promise<WebSocketWithConnection>(resolve => {
void reply.hijack();
// Note that the existence of pin is already checked in `routeOptions.handler` proxy
const pin = request.raw[kWsPin]!;
wss.handleUpgrade(request.raw, pin.socket, pin.head, socket => {
pin.isResolved = true;
wss.emit('connection', socket, request.raw);
const connection = createWebSocketStream(socket) as WebSocketWithConnection;
connection.on('error', error => {
fastify.log.error(error);
});
socket.on('newListener', event => {
if (event === 'message') {
connection.resume();
}
});
connection.socket = socket;
if (typeof wsHandler === 'function') {
void wsHandler(wss, connection, request);
}
resolve(connection);
});
});
// Attach wss
fastify.decorate('wss', null);
fastify.decorateRequest('resolveWebSocket', null);
fastify.wss = wss;
// Add route option
fastify.addHook('onRoute', routeOptions => {
const useWebSocket = routeOptions.useWebSocket ?? typeof routeOptions.wsHandler === 'function';
if (routeOptions.method === 'HEAD') {
return;
}
if (useWebSocket && routeOptions.method !== 'GET') {
throw new Error('WebSocket connection should be made in GET method!');
}
routeOptions.handler = new Proxy(routeOptions.handler, {
apply(target, thisArg, argArray) {
const [request, reply] = argArray as [FastifyRequest, FastifyReply];
// Check if this request is upgrade request
const pin = request.raw[kWsPin];
if (!pin) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return Reflect.apply(target, thisArg, argArray);
}
if (!useWebSocket) {
// Close the web socket request
throw new Error('WebSocket connection is not available in current path!');
}
request.resolveWebSocket = async () => resolveWebSocket(request, reply, routeOptions.wsHandler);
return (Reflect.apply(target, thisArg, argArray) as Promise<unknown>)
.catch((error: unknown) => {
// If `pin.isResolved` is set to `true`, we know that the reply is already hijacked in the workflow.
if (pin.isResolved) {
pin.socket.destroy();
return;
}
throw error;
});
},
});
});
// Add close hook (see https://github.com/fastify/fastify-websocket/blob/master/index.js#L155)
fastify.server.close = new Proxy(fastify.server.close, {
apply(target, thisArg, argArray) {
for (const client of fastify.wss.clients) {
client.close();
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return Reflect.apply(target, thisArg, argArray);
},
});
fastify.server.on('upgrade', (request, socket, head) => {
if (request.method !== 'GET') {
throw new Error('WebSocket request should be made in GET request!');
}
const response = new ServerResponse(request);
request[kWsPin] = {
socket,
head,
};
// @ts-expect-error It's just ok.
response.assignSocket(socket);
fastify.routing(request, response);
});
};
export const useWebSocket = fastifyPlugin(webSocketPlugin);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment