Skip to content

Instantly share code, notes, and snippets.

@marcus-sa
Last active May 23, 2022 22:17
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcus-sa/49be090f5d79025795b4dbab016d50e5 to your computer and use it in GitHub Desktop.
Save marcus-sa/49be090f5d79025795b4dbab016d50e5 to your computer and use it in GitHub Desktop.
Deepkit WebSocket RPC Cloudflare Workers Interconnection
import { createTestingApp } from '@deepkit/framework';
import { ControllerSymbol, rpc } from '@deepkit/rpc';
import { entity } from '@deepkit/type';
import {
webSocketFetchRequestHandler,
CloudflareWorkerRpcWebSocketClient,
} from '../src';
// https://miniflare.dev/testing/jest
test('client and server', async () => {
const UserController = ControllerSymbol<Controller>('test');
@entity.name('user')
class User {}
@rpc.controller(UserController)
class Controller {
@rpc.action()
getUser(): User {
return new User();
}
}
const { app } = createTestingApp({
controllers: [Controller],
});
const request = new Request('http://localhost/', {
headers: {
Upgrade: 'websocket',
},
});
const response = webSocketFetchRequestHandler({
request,
app,
});
const client = new CloudflareWorkerRpcWebSocketClient(response);
await client.connect();
const controller = client.controller(UserController);
await expect(controller.getUser()).resolves.toBeInstanceOf(User);
});
import type {
ClientTransportAdapter,
TransportConnectionHooks,
} from '@deepkit/rpc';
import { RpcClient } from '@deepkit/rpc';
import type { ClassType } from '@deepkit/core';
export type CloudflareWorkerRpcWebSocketClientOptions = string | Response;
// https://developers.cloudflare.com/workers/learning/using-websockets#writing-a-websocket-client
export class CloudflareWorkerRpcWebSocketClientAdapter
implements ClientTransportAdapter
{
constructor(public options: CloudflareWorkerRpcWebSocketClientOptions) {}
private getURL(): string {
if (typeof this.options === 'string') return this.options;
return this.options.url;
}
private async getWebSocketClient(): Promise<WebSocket> {
let response: CloudflareWorkerRpcWebSocketClientOptions = this.options;
if (typeof this.options === 'string') {
response = await fetch(this.options, {
headers: {
Upgrade: 'websocket',
},
});
}
// If the WebSocket handshake completed successfully, then the
// response has a `webSocket` property.
// @ts-expect-error
const ws = (response as Response).webSocket;
if (!ws) {
throw new Error("Server didn't accept WebSocket");
}
return ws;
}
async connect(connection: TransportConnectionHooks): Promise<void> {
const ws = await this.getWebSocketClient();
// Call accept() to indicate that you'll be handling the socket here
// in JavaScript, as opposed to returning it on to a client.
// @ts-expect-error
ws.accept();
ws.addEventListener('close', () => connection.onClose());
ws.addEventListener('error', (err: any) => connection.onError(err));
ws.addEventListener('message', (event: MessageEvent) =>
connection.onData(new Uint8Array(event.data)),
);
// TODO: Figure out whether or not "open" event listener is needed
// ws.addEventListener('open', () => {
connection.onConnected({
clientAddress: () => this.getURL(),
send: (message: Uint8Array) => ws.send(message),
close: () => ws.close(),
});
// });
}
}
// Cloudflare Worker <-> Cloudflare Worker
export class CloudflareWorkerRpcWebSocketClient extends RpcClient {
constructor(options: CloudflareWorkerRpcWebSocketClientOptions) {
super(new CloudflareWorkerRpcWebSocketClientAdapter(options));
}
static fromCurrentRequest<T extends ClassType<RpcClient>>(
this: T,
request: Request,
baseUrl: string = '',
): InstanceType<T> {
const url = new URL(request.url);
const ws = url.protocol.startsWith('https') ? 'wss' : 'ws';
if (baseUrl.length && baseUrl[0] !== '/') baseUrl = '/' + baseUrl;
return new (this as any)(`${ws}://${url.host}${baseUrl}`);
}
}
import { createRpcConnection } from '@deepkit/framework';
import { FetchRequestHandlerOptions } from './types';
import { RpcKernel } from '@deepkit/rpc';
import { InjectorContext } from '@deepkit/injector';
export function webSocketFetchRequestHandler<M>({
request,
app,
}: FetchRequestHandlerOptions<M>): Response {
if (request.headers.get('Upgrade') != 'websocket') {
return new Response('Expected WebSocket', { status: 426 });
}
// To accept the WebSocket request, we create a WebSocketPair (which is like a socket pair,
// i.e. two WebSockets that talk to each other), we return one end of the pair in the
// response, and we operate on the other end. Note that this API is not part of the
// Fetch API standard; unfortunately, the Fetch API / Service Workers specs do not define
// any way to act as a WebSocket server today.
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);
// @ts-ignore
server.accept();
const rpcKernel = app.get(RpcKernel);
const injectorContext = app.get(InjectorContext);
const connection = createRpcConnection(injectorContext, rpcKernel, {
close: () => server.close(),
write: (buffer: Uint8Array) => server.send(buffer),
clientAddress: () => {
// Get the client's IP address
const ip = request.headers.get('CF-Connecting-IP');
if (!ip) throw new Error('No IP address');
return ip;
},
});
server.addEventListener('close', () => connection.close());
server.addEventListener('message', (event: MessageEvent) =>
connection.feed(new Uint8Array(event.data)),
);
// Now we return the other end of the pair to the client.
return new Response(null, { status: 101, webSocket: client });
}
import type { App } from '@deepkit/app';
export interface FetchRequestHandlerOptions<M> {
readonly app: App<M>;
readonly request: Request;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment