Created
April 8, 2024 19:26
-
-
Save ultraviolet-jordan/165fab73fc85c4bd6faaf596ee22b0cc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export type Socket = { | |
host: string; | |
port: number; | |
}; | |
export default class ClientStream { | |
// constructor | |
private readonly socket: WebSocket; | |
private readonly wsin: WebSocketReader; | |
private readonly wsout: WebSocketWriter; | |
// runtime | |
private closed: boolean = false; | |
private ioerror: boolean = false; | |
static openSocket = async (socket: Socket): Promise<WebSocket> => { | |
return await new Promise<WebSocket>((resolve, reject): void => { | |
const secured: boolean = socket.host.startsWith('https'); | |
const protocol: string = secured ? 'wss' : 'ws'; | |
const host: string = socket.host.substring(socket.host.indexOf('//') + 2); | |
const port: number = secured ? socket.port + 2 : socket.port + 1; | |
const ws: WebSocket = new WebSocket(`${protocol}://${host}:${port}`, 'binary'); | |
ws.addEventListener('open', (): void => { | |
console.log('connection open!'); | |
resolve(ws); | |
}); | |
ws.addEventListener('error', (): void => { | |
console.log('connection error!'); | |
reject(ws); | |
}); | |
}); | |
}; | |
constructor(socket: WebSocket) { | |
socket.onclose = this.onclose; | |
socket.onerror = this.onerror; | |
this.wsin = new WebSocketReader(socket, 5000); | |
this.wsout = new WebSocketWriter(socket, 5000); | |
this.socket = socket; | |
} | |
get host(): string { | |
return this.socket.url.split('/')[2]; | |
} | |
get port(): number { | |
return parseInt(this.socket.url.split(':')[2], 10); | |
} | |
get available(): number { | |
return this.closed ? 0 : this.wsin.available; | |
} | |
write = (src: Uint8Array, len: number): void => { | |
this.wsout.write(src, len); | |
}; | |
read = async (): Promise<number> => { | |
return this.closed ? 0 : await this.wsin.read(); | |
}; | |
readBytes = async (dst: Uint8Array, off: number, len: number): Promise<void> => { | |
if (this.closed) { | |
return; | |
} | |
while (len > 0) { | |
const read: Uint8Array = await this.wsin.readBytes(dst, off, len); | |
if (read.length <= 0) { | |
throw new Error('EOF'); | |
} | |
off += read.length; | |
len -= read.length; | |
} | |
}; | |
close = (): void => { | |
this.closed = true; | |
this.socket.close(); | |
this.wsin.close(); | |
this.wsout.close(); | |
console.log('connection close!'); | |
if (this.ioerror) { | |
console.log('connection error!'); | |
} | |
}; | |
private onclose = (event: CloseEvent): void => { | |
if (this.closed) { | |
return; | |
} | |
this.close(); | |
}; | |
private onerror = (event: Event): void => { | |
if (this.closed) { | |
return; | |
} | |
this.ioerror = true; | |
this.close(); | |
}; | |
} | |
class WebSocketWriter { | |
// constructor | |
private readonly socket: WebSocket; | |
private readonly limit: number; | |
private closed: boolean = false; | |
private ioerror: boolean = false; | |
constructor(socket: WebSocket, limit: number) { | |
this.socket = socket; | |
this.limit = limit; | |
} | |
write = (src: Uint8Array, len: number): void => { | |
if (this.closed) { | |
return; | |
} | |
if (this.ioerror) { | |
this.ioerror = false; | |
throw new Error('Error in writer thread'); | |
} | |
if (len > this.limit || src.length > this.limit) { | |
throw new Error('buffer overflow'); | |
} | |
try { | |
this.socket.send(src.subarray(0, len)); | |
} catch (e) { | |
this.ioerror = true; | |
} | |
}; | |
close = (): void => { | |
this.closed = true; | |
}; | |
} | |
class WebSocketReader { | |
// constructor | |
private readonly limit: number; | |
// runtime | |
private buffer: number[] = []; | |
private callback: ((data: number[]) => void) | null = null; | |
private closed: boolean = false; | |
constructor(socket: WebSocket, limit: number) { | |
this.limit = limit; | |
socket.binaryType = 'arraybuffer'; | |
socket.onmessage = this.onmessage; | |
} | |
private onmessage = (event: MessageEvent): void => { | |
if (this.closed) { | |
throw new Error('WebSocketReader is closed!'); | |
} | |
const read: Uint8Array = new Uint8Array(event.data); | |
for (let index: number = 0; index < read.length; index++) { | |
this.buffer.unshift(read[index]); | |
} | |
if (!this.callback) { | |
return; | |
} | |
this.callback(this.buffer); | |
this.callback = null; | |
// check for the overflow after the callback | |
if (this.buffer.length > this.limit) { | |
throw new Error('buffer overflow'); | |
} | |
}; | |
readBytes = async (dst: Uint8Array, off: number, length: number): Promise<Uint8Array> => { | |
if (this.closed) { | |
throw new Error('WebSocketReader is closed!'); | |
} | |
while (this.buffer.length < length) { | |
await new Promise((resolve): ((value: PromiseLike<((data: number[]) => void) | null>) => void) => (this.callback = resolve)); | |
} | |
while (length--) { | |
const byte: number | undefined = this.buffer.pop(); | |
if (byte === undefined) { | |
throw new Error('EOF'); | |
} | |
dst[off++] = byte; | |
} | |
return dst; | |
}; | |
read = async (): Promise<number> => { | |
if (this.closed) { | |
throw new Error('WebSocketReader is closed!'); | |
} | |
while (this.buffer.length < 1) { | |
await new Promise((resolve): ((value: PromiseLike<((data: number[]) => void) | null>) => void) => (this.callback = resolve)); | |
} | |
const byte: number | undefined = this.buffer.pop(); | |
if (byte === undefined) { | |
throw new Error('EOF'); | |
} | |
return byte; | |
}; | |
get available(): number { | |
return this.buffer.length; | |
} | |
close = (): void => { | |
this.callback = null; | |
this.buffer = []; | |
this.closed = true; | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment