Skip to content

Instantly share code, notes, and snippets.

@jamesbirtles
Created August 25, 2019 14:38
Show Gist options
  • Save jamesbirtles/d0677d5b70b164e6564e391c213568a1 to your computer and use it in GitHub Desktop.
Save jamesbirtles/d0677d5b70b164e6564e391c213568a1 to your computer and use it in GitHub Desktop.
Jayson WebSocket server and client
import {
Client,
ClientOptions,
Utils,
UtilsJSONStringifyOptions,
RequestParamsLike,
JSONRPCResultLike
} from "jayson";
import { promisify, isString, isError } from "util";
import WS from "ws";
const stringify: (
obj: object,
options: UtilsJSONStringifyOptions | null | undefined
) => Promise<string> = promisify(Utils.JSON.stringify) as any;
export interface WebSocketClientOptions extends ClientOptions {
url: string;
}
export class WebSocketClient extends Client {
public socket: WS;
private queue = new Set<{ request: object; callback: RequestCallback }>();
private requests = new Map<any, RequestCallback>();
constructor(public options: WebSocketClientOptions) {
super(options);
this.socket = new WS(options.url);
this.socket.on("open", () => {
this.emit("open");
const messages = Array.from(this.queue);
this.queue.clear();
for (let { request, callback } of messages) {
this._request(request, callback);
}
});
this.socket.on("message", data => {
if (!isString(data)) {
this.emit(
"error",
new Error("Non-string message received from server")
);
return;
}
Utils.JSON.parse(data, this.options, (err, message) => {
if (err) {
this.emit("error", err);
return;
}
if (hasID(message) && this.requests.has(message.id)) {
const callback = this.requests.get(message.id)!;
this.requests.delete(message.id);
if (isRPCError(message)) {
callback(message.error);
} else {
callback(null, message);
}
}
});
});
}
public requestAsync<R = any>(method: string, ...params: any[]): Promise<R> {
return new Promise((resolve, reject) => {
this.request(method, params, (err: any, res: any) => {
if (err) {
reject(new JSONRPCError(err));
return;
}
resolve(res.result);
});
});
}
private async _request(request: object, callback: RequestCallback) {
if (this.socket.readyState !== ReadyState.Open) {
this.queue.add({ request, callback });
return;
}
try {
const body = await stringify(request, this.options);
if (hasID(request)) {
this.requests.set(request.id, callback);
}
this.socket.send(body);
} catch (err) {
callback(err);
}
}
}
type RequestCallback = (err: any, response?: any) => void;
enum ReadyState {
Connecting,
Open,
Closing,
Closed
}
interface RPCWithID {
id: any;
}
function hasID(a: any): a is RPCWithID {
return a && "id" in a;
}
interface RPCWithError {
jsonrpc: string;
id: string;
error: { code: number; message: string };
}
function isRPCError(a: any): a is RPCWithError {
return a && "error" in a;
}
interface RPCWithResult {
jsonrpc: string;
id: string;
result: any;
}
function isRPCResult(a: any): a is RPCWithResult {
return a && "result" in a;
}
export class JSONRPCError extends Error {
constructor(public error: RPCWithError["error"]) {
super("JSONRPC Error: " + JSON.stringify(error));
Object.setPrototypeOf(this, new.target.prototype);
}
}
import { Server as WsServer, ServerOptions as WSServerOptions } from "ws";
import {
Server,
Utils,
UtilsJSONParseOptions,
UtilsJSONStringifyOptions
} from "jayson";
import { isString } from "lodash";
export interface WebSocketServerOptions
extends UtilsJSONParseOptions,
UtilsJSONStringifyOptions {
wss: WSServerOptions;
}
export class WebSocketServer extends Server {
private wss: WsServer;
constructor(
methods: { [methodName: string]: AsyncMethod },
options: WebSocketServerOptions
) {
super(
Object.keys(methods).reduce(
(all, method) => ({
...all,
[method]: (
args: any[],
callback: (err: any, result?: any) => void
) => {
methods[method]
.apply(this, args)
.then(res => callback(null, res), err => callback(err));
}
}),
{}
),
options
);
this.wss = new WsServer(options.wss);
this.wss.on("connection", client => {
function send(data: object) {
Utils.JSON.stringify(data, options, (err, str) => {
if (err) {
console.error("Failed to stringify message", err);
return;
}
client.send(str);
});
}
client.on("message", data => {
if (!isString(data)) {
send(this.error(400, "Only string messages are supported"));
return;
}
Utils.JSON.parse(data, options, (err, message) => {
if (err) {
send(this.error(400, "Failed to parse message JSON"));
return;
}
this.call(message as any, (error: any, success: object) => {
const response = error || success;
if (!response) {
// Don't need to respond to notifications
return;
}
send(response);
});
});
});
});
}
}
export type AsyncMethod = (
this: WebSocketServer,
...args: any[]
) => Promise<any>;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment