Created
March 26, 2021 01:58
-
-
Save CGamesPlay/62e2e06d12f5713179b54af2e03a32b3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Conn, Transport, Message, RPCMessage, Client } from "capnp-ts"; | |
class AsyncQueue<T> { | |
waitingData: T[] = []; | |
waitingReaders: ((m: T) => void)[] = []; | |
push(val: T) { | |
if (this.waitingReaders.length > 0) { | |
this.waitingReaders.shift()!(val); | |
} else { | |
this.waitingData.push(val); | |
} | |
} | |
shift(): Promise<T> { | |
if (this.waitingData.length > 0) { | |
return Promise.resolve(this.waitingData.shift()!); | |
} else { | |
return new Promise((resolve) => { | |
this.waitingReaders.push(resolve); | |
}); | |
} | |
} | |
} | |
class WSTransport implements Transport { | |
ready: Promise<unknown>; | |
messages = new AsyncQueue<RPCMessage>(); | |
constructor(private socket: WebSocket) { | |
if (socket.readyState === 1) { | |
this.ready = Promise.resolve(undefined); | |
} else { | |
this.ready = new Promise((resolve, reject) => { | |
socket.addEventListener("open", () => { | |
resolve(undefined); | |
}); | |
socket.addEventListener("error", (e) => { | |
reject(new Error("socket error")); | |
}); | |
}); | |
} | |
socket.addEventListener("error", (e) => { | |
this.ready = Promise.reject(new Error("socket error")); | |
}); | |
socket.addEventListener("message", async (e) => { | |
const buffer = await (e.data as Blob).arrayBuffer(); | |
const msg = new Message(buffer, false); | |
const rpcMsg = msg.getRoot(RPCMessage); | |
this.messages.push(rpcMsg); | |
}); | |
} | |
async sendMessage(msg: RPCMessage) { | |
await this.ready; | |
this.socket.send(msg.segment.message.toArrayBuffer()); | |
} | |
async recvMessage(): Promise<RPCMessage> { | |
return this.messages.shift(); | |
} | |
async close() { | |
await this.ready.catch(() => {}); | |
this.socket.close(); | |
} | |
} | |
type Finalizer = () => void; | |
class WeakWrapper { | |
nextId = 0; | |
mapping: { [key: string]: Finalizer } = {}; | |
registry = new FinalizationRegistry((key: string) => { | |
const f = this.mapping[key]; | |
delete this.mapping[key]; | |
if (f) f(); | |
}); | |
weak = (obj: any, f: Finalizer) => { | |
this.nextId++; | |
const key = this.nextId.toString(); | |
this.mapping[key] = f; | |
this.registry.register(obj, key); | |
}; | |
} | |
export function connect(url: string): Promise<Client> { | |
const socket = new WebSocket(url); | |
const transport = new WSTransport(socket); | |
const conn = new Conn(transport, new WeakWrapper().weak); | |
return conn.bootstrap(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment