Skip to content

Instantly share code, notes, and snippets.

@CGamesPlay
Created March 26, 2021 01:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CGamesPlay/62e2e06d12f5713179b54af2e03a32b3 to your computer and use it in GitHub Desktop.
Save CGamesPlay/62e2e06d12f5713179b54af2e03a32b3 to your computer and use it in GitHub Desktop.
import { Conn, Transport, Message, RPCMessage, Client } from "capnp-ts";
class AsyncQueue<T> {
waitingData: T[] = [];
waitingReaders: ((m: T) => void)[] = [];
push(val: T) {
if (this.waitingReaders.length > 0) {
this.waitingReaders.shift()!(val);
} else {
this.waitingData.push(val);
}
}
shift(): Promise<T> {
if (this.waitingData.length > 0) {
return Promise.resolve(this.waitingData.shift()!);
} else {
return new Promise((resolve) => {
this.waitingReaders.push(resolve);
});
}
}
}
class WSTransport implements Transport {
ready: Promise<unknown>;
messages = new AsyncQueue<RPCMessage>();
constructor(private socket: WebSocket) {
if (socket.readyState === 1) {
this.ready = Promise.resolve(undefined);
} else {
this.ready = new Promise((resolve, reject) => {
socket.addEventListener("open", () => {
resolve(undefined);
});
socket.addEventListener("error", (e) => {
reject(new Error("socket error"));
});
});
}
socket.addEventListener("error", (e) => {
this.ready = Promise.reject(new Error("socket error"));
});
socket.addEventListener("message", async (e) => {
const buffer = await (e.data as Blob).arrayBuffer();
const msg = new Message(buffer, false);
const rpcMsg = msg.getRoot(RPCMessage);
this.messages.push(rpcMsg);
});
}
async sendMessage(msg: RPCMessage) {
await this.ready;
this.socket.send(msg.segment.message.toArrayBuffer());
}
async recvMessage(): Promise<RPCMessage> {
return this.messages.shift();
}
async close() {
await this.ready.catch(() => {});
this.socket.close();
}
}
type Finalizer = () => void;
class WeakWrapper {
nextId = 0;
mapping: { [key: string]: Finalizer } = {};
registry = new FinalizationRegistry((key: string) => {
const f = this.mapping[key];
delete this.mapping[key];
if (f) f();
});
weak = (obj: any, f: Finalizer) => {
this.nextId++;
const key = this.nextId.toString();
this.mapping[key] = f;
this.registry.register(obj, key);
};
}
export function connect(url: string): Promise<Client> {
const socket = new WebSocket(url);
const transport = new WSTransport(socket);
const conn = new Conn(transport, new WeakWrapper().weak);
return conn.bootstrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment