Skip to content

Instantly share code, notes, and snippets.

@intrnl
Created January 9, 2021 10:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save intrnl/e249d4deeafeafa2fb055038bbbaecc0 to your computer and use it in GitHub Desktop.
Save intrnl/e249d4deeafeafa2fb055038bbbaecc0 to your computer and use it in GitHub Desktop.
import { getPlatformInfo } from '../utils/platform';
import { noop } from '../utils/util';
import * as http from '../lib/http';
import { GATEWAY_VERSION } from '../constants/endpoints';
import { GATEWAY } from '../constants/api';
import { GatewayOP, ConnectionStatus } from '../constants/gateway';
import { Inflate } from '@intrnl/pako-esm/inflate';
import { Z_SYNC_FLUSH } from '@intrnl/pako-esm/constants';
export interface ClientOpts {
token: string;
onConnectionChange?: ((status: ConnectionStatus) => void);
onConnectionClose?: ((code: number, event: CloseEvent) => void);
onGatewayEvent?: ((type: string, data: any) => void);
gatewayURL?: string;
}
export class Client {
opts: ClientOpts;
status = ConnectionStatus.DISCONNECTED;
sessionID?: number;
sequence?: number;
private _zlib?: Inflate;
private _ws?: WebSocket;
_heartbeatInterval?: number;
lastHeartbeatAck = true;
lastHeartbeatReceived?: number;
lastHeartbeatSent?: number;
onConnectionChange: ((status: ConnectionStatus) => void);
onConnectionClose: ((code: number, event: CloseEvent) => void);
onGatewayEvent: ((type: string, data: any) => void);
constructor (opts: ClientOpts) {
this.opts = { ...opts };
this.onConnectionChange = opts.onConnectionChange ?? noop;
this.onConnectionClose = opts.onConnectionClose ?? noop;
this.onGatewayEvent = opts.onGatewayEvent ?? noop;
}
async connect () {
this._changeConnectionStatus(ConnectionStatus.CONNECTING);
if (!this.opts.gatewayURL)
await this._getGateway();
console.debug('[CLIENT]', 'Instantiating zlib context');
this._zlib = new Inflate({ chunkSize: 65536, to: 'string' });
let gatewayURL = this.opts.gatewayURL;
gatewayURL += `?v=${GATEWAY_VERSION}`;
gatewayURL += '&compress=zlib-stream';
gatewayURL += '&encoding=json';
console.debug('[CLIENT]', 'Opening a connection to gateway');
this._ws = new WebSocket(gatewayURL);
this._ws.binaryType = 'arraybuffer';
this._ws.onopen = this._onWSOpen.bind(this);
this._ws.onclose = this._onWSClose.bind(this);
this._ws.onmessage = this._onWSMessage.bind(this);
this._ws.onerror = this._onWSError.bind(this);
}
async disconnect () {
this._changeConnectionStatus(ConnectionStatus.DISCONNECTED);
clearInterval(this._heartbeatInterval);
if (this._ws !== undefined) {
this._ws.onclose = undefined;
this._ws.close(1000);
this._ws = undefined;
}
this.sessionID = undefined;
this.sequence = undefined;
}
async reconnect () {
this._changeConnectionStatus(ConnectionStatus.DISCONNECTED);
clearInterval(this._heartbeatInterval);
if (this._ws !== undefined) {
this._ws.onclose = undefined;
this._ws.close(4901);
this._ws = undefined;
}
this.connect();
}
private async _getGateway () {
console.debug('[CLIENT]', 'Discovering gateway');
let { data } = await http.get(GATEWAY).json();
return this.opts.gatewayURL = data.url;
}
private _onWSOpen () {
console.debug('[CLIENT]', 'Gateway connection opened');
this._changeConnectionStatus(ConnectionStatus.CONNECTED);
}
private _onWSClose (message: CloseEvent) {
console.debug('[CLIENT]', 'Gateway connection closed');
this._changeConnectionStatus(ConnectionStatus.DISCONNECTED);
let reconnect = true;
switch (message.code) {
case 4004: { // Authentication fail
reconnect = false;
}
case 4007: { // Invalid sequence
this.sessionID = undefined;
this.sequence = undefined;
break;
}
}
this._notifyConnectionClose(message.code, message);
if (reconnect) this.reconnect();
}
private _onWSError (message: Event) {
console.debug('[CLIENT]', 'WebSocket error');
console.debug(message);
}
private _onWSMessage (message: MessageEvent<ArrayBuffer>) {
let buffer = message.data;
let view = new DataView(buffer);
let length = view.byteLength;
let flush = length >= 4 && view.getUint32(length - 4, false) === 65535;
this._zlib.push(buffer, flush && Z_SYNC_FLUSH);
if (!flush) return;
let json = this._zlib.result.toString();
let payload = JSON.parse(json);
let { op, s: seq, t: type, d: data } = payload;
this.sequence = seq;
switch (op) {
case GatewayOP.EVENT: {
this._onWSEvent(type, data);
break;
}
case GatewayOP.HELLO: {
console.debug('[CLIENT]', 'HELLO message received from gateway');
if (data.heartbeat_interval > 0) {
if (this._heartbeatInterval)
clearInterval(this._heartbeatInterval);
// @ts-ignore: blame @types/node
this._heartbeatInterval = setInterval(() => (
this._sendHeartbeat(true)
), data.heartbeat_interval);
}
if (this.sessionID) {
this._resumeSession();
} else {
this._identifySession();
}
break;
}
case GatewayOP.INVALID_SESSION: {
console.debug('[CLIENT]', 'Invalid session, reidentifying');
this.sequence = undefined;
this.sessionID = undefined;
this._identifySession();
break;
}
case GatewayOP.RECONNECT: {
console.debug('[CLIENT]', 'Gateway asked for reconnection');
this.reconnect();
break;
}
case GatewayOP.HEARTBEAT: {
console.debug('[CLIENT]', 'Gateway requested manual heartbeat');
this._sendHeartbeat();
break;
}
case GatewayOP.HEARTBEAT_ACK: {
console.debug('[CLIENT]', 'Heartbeat acknowledged by gateway');
this.lastHeartbeatAck = true;
this.lastHeartbeatReceived = Date.now();
break;
}
default: {
console.debug('[CLIENT]', `Unhandled op ${op}`);
}
}
}
private _onWSEvent (type: string, data: any) {
switch (type) {
case 'RESUMED':
case 'READY': {
this._changeConnectionStatus(ConnectionStatus.READY);
this.sessionID = data.session_id;
break;
}
}
this._notifyGatewayEvent(type, data);
}
private _sendWSMessage (op: GatewayOP, data: any) {
if (!this._ws || this.status < ConnectionStatus.CONNECTED) return;
let payload = JSON.stringify({ op, d: data });
this._ws.send(payload);
}
private _identifySession () {
let data = {
token: this.opts.token,
capabilities: 1,
compress: true,
presence: {
status: 'online',
since: 0,
afk: false,
activities: [],
},
properties: getPlatformInfo(),
};
console.debug('[CLIENT]', 'Identifying session');
this._changeConnectionStatus(ConnectionStatus.IDENTIFYING);
this._sendWSMessage(GatewayOP.IDENTIFY, data);
}
private _resumeSession () {
let data = {
token: this.opts.token,
session_id: this.sessionID,
seq: this.sequence,
};
console.debug('[CLIENT]', 'Resuming session');
this._changeConnectionStatus(ConnectionStatus.RESUMING);
this._sendWSMessage(GatewayOP.RESUME, data);
}
private _sendHeartbeat (normal?: boolean) {
if (normal) {
if (!this.lastHeartbeatAck) {
console.debug('[CLIENT]', 'Last heartbeat not acknowledged, reconnecting');
this.reconnect();
return;
}
console.debug('[CLIENT]', 'Sending normal heartbeat');
this.lastHeartbeatAck = false;
}
this.lastHeartbeatSent = Date.now();
this._sendWSMessage(GatewayOP.HEARTBEAT, this.sequence);
}
private _changeConnectionStatus (status: ConnectionStatus) {
console.debug('[CLIENT]', `Connection status is now ${ConnectionStatus[status]}`);
if (this.status !== status) {
try {
this.onConnectionChange(this.status);
} catch {}
}
this.status = status;
}
private _notifyConnectionClose (code: number, event: CloseEvent) {
try {
this.onConnectionClose(code, event);
} catch {}
}
private _notifyGatewayEvent (type: string, data: any) {
try {
this.onGatewayEvent(type, data);
} catch {}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment