Skip to content

Instantly share code, notes, and snippets.

@lap00zza lap00zza/nanoWs-client.js
Last active Oct 29, 2018

Embed
What would you like to do?
A barebones WebSocket client for nodejs
/**
* nanoWS
* A very basic websocket client. Its does not fully cover rfc6455 so
* it must not used for any real world work. I wrote it to find out
* how websockets actually worked.
*
* @licence MIT
* @author Jewel Mahanta <jewelmahanta@gmail.com>
*/
const crypto = require("crypto");
const { URL } = require("url");
const https = require("https");
const http = require("http");
const { EventEmitter } = require("events");
// const zlib = require("zlib");
// const { StringDecoder } = require("string_decoder");
// ERRORS
class ProtocolError extends Error {
constructor() {
super();
this.name = "ProtocolError";
this.message = "only https, http, wss and ws are supported";
}
}
///////////////////////
// OPCODE Reference //
///////////////////////
// prettier-ignore
const OPCODE = {
CONTINUE: 0x0,
TEXT : 0x1,
BINARY : 0x2,
CLOSE : 0x8,
PING : 0x9,
PONG : 0xa
};
const SOCKET_STATE = {
CONNECTING: 0,
OPEN: 1,
CLOSING: 2,
CLOSED: 3
};
// prettier-ignore
/**
* Decodes a websocket data frame header (byte 1 and byte 2) according
* to [rfc6455](https://tools.ietf.org/html/rfc6455#section-5.2)
* @param {Buffer} header - The first 2 bytes of frame
*/
const getFrameInfo = (header) => {
// BYTE 1
const FIN = header[0] & 0b10000000;
const RESV1 = header[0] & 0b01000000; /* no-use */
const RESV2 = header[0] & 0b00100000; /* no-use */
const RESV3 = header[0] & 0b00010000; /* no-use */
const OPCODE = header[0] & 0b00001111;
// BYTE 2
const MASK = header[1] & 0b10000000;
const LEN = header[1] & 0b01111111;
return {
FIN: !!FIN,
OPCODE,
MASK: !!MASK,
LEN
};
};
/**
* Payload masking according to
* [rfc6455](https://tools.ietf.org/html/rfc6455#section-5.3)
* @param {Buffer} payload
*/
const maskPayload = payload => {
const maskingKey = crypto.randomBytes(4);
const maskedPayload = Buffer.alloc(payload.length);
for (let i = 0; i < payload.length; i++) {
const j = i % 4;
maskedPayload[i] = payload[i] ^ maskingKey[j];
}
return {
payload: maskedPayload,
key: maskingKey
};
};
/**
* @param {String | Buffer} data
* @param {OPCODE} type
* @returns {Buffer}
* @todo add support for length > 65535
*/
const encodeFrame = (data, type) => {
// if (type === OPCODE.TEXT);
// else if (type === OPCODE.BINARY);
// else throw new TypeError();
// Buffer size (in bytes):
// 2 for Headers (fin, opcode, mask, len)
// :then maybe one of:
// - 2 for extended length if 125 < len <= 65535
// - 8 for extended length if len > 65535
// 4 for MASKING KEY
// rest is for payload length
if (data.length <= 0x7d /* 125 */) {
const _b = Buffer.alloc(2);
_b[0] = 0b10000000 | type;
_b[1] = 0b10000000 | data.length;
const masked = maskPayload(Buffer.from(data));
return Buffer.concat(
[_b, masked.key, masked.payload],
2 + 4 + data.length
);
} else if (data.length <= 0xffff /* 65535 */) {
const _b = Buffer.alloc(2 + 2);
_b[0] = 0b10000001;
_b[1] = 0b11111110; // LEN must be 126
_b[2] = (0b1111111100000000 & data.length) >> 8;
_b[3] = 0b0000000011111111 & data.length;
const masked = maskPayload(Buffer.from(data));
return Buffer.concat(
[_b, masked.key, masked.payload],
2 + 2 + 4 + data.length
);
} else {
throw new RangeError("Not equipped to handled > 65535 length");
}
};
class WebSocket extends EventEmitter {
/**
* @param {String} url the websocket url
* @param {Boolean} [debug=false] log raw socket events
*/
constructor(url, debug = false) {
const _url = new URL(url);
if (!["https:", "http:", "wss:", "ws:"].includes(_url.protocol))
throw new ProtocolError();
if (_url.protocol === "wss:") _url.protocol = "https:";
if (_url.protocol === "ws:") _url.protocol = "http:";
super();
this.url = _url;
this.debug = debug;
this._socket = undefined;
this.httpModule =
_url.protocol === "https:" || _url.protocol === "wss:"
? https
: http;
this.SOCKET_STATE = "CONNECTING";
this.getSocket();
// @experimental
// this.finBytes = 0;
this._processing = false;
this._buffers = [];
this._remaining = Buffer.alloc(0);
this._bufferedBytes = 0;
// current frame. Why instance variable?
// because a frame can span across chunks.
this._frame = {};
}
static getHTTPHeaders() {
return {
headers: {
Upgrade: "websocket",
Connection: "Upgrade",
"Sec-WebSocket-Key": crypto.randomBytes(16).toString("hex"),
"Sec-WebSocket-Version": 13
}
};
}
/**
* A websocket starts off as a normal HTTP request. This is the
* handshake part. Once our connection is upgraded, the actual
* data transfer can start.
*
* NOTE: not chaining request coz the stack trace gets a bit messy.
*/
getSocket() {
const request = this.httpModule.request(
this.url,
WebSocket.getHTTPHeaders()
);
request.on("response", res => {
if (this.debug) console.log("::response::");
res.pipe(process.stdout);
});
// This is what we are interested in. Remember
// socket (net.Socket) is a Duplex stream.
request.on("upgrade", (res, socket, head) => {
if (this.debug) console.log("::upgrade::");
if (this.debug) console.log("\x1b[35m%s\x1b[0m", "::head::", head);
this._socket = socket;
this.attachSocketEvents();
this.SOCKET_STATE = "OPEN";
this.emit("open");
if (head.length > 0) {
this._buffers.push(head);
this._bufferedBytes += head.length;
this.processBuffers();
}
});
request.on("error", e => {
throw new Error(e);
});
request.end();
}
/**
* We finally got our socket. Cool! Lets add a few listeners to make
* the socket actually useful.
* @see handleClose
*/
attachSocketEvents() {
this._socket.on("data", chunk => {
if (this.debug) console.log("\x1b[35m%s\x1b[0m", "::raw::", chunk);
this._buffers.push(chunk);
this._bufferedBytes += chunk.length;
this.processBuffers();
});
this._socket.on("close", () => {
if (this.debug) console.log("\x1b[31m%s\x1b[0m", "::closed::");
this.SOCKET_STATE = "CLOSED";
this.emit("close");
});
}
getBytes(n) {
if (this.debug) console.log("\x1b[31m%s\x1b[0m", `::GET:: ${n} bytes`);
if (this._bufferedBytes === 0) return false;
while (this._remaining.length < n) {
if (this._buffers.length === 0) return false;
const shifted = this._buffers.shift();
// prettier-ignore
if (this.debug)
console.log("\x1b[35m::FILL REMAINING::", this._remaining, shifted, "\x1b[0m");
this._remaining = Buffer.concat(
[this._remaining, shifted],
this._remaining.length + shifted.length
);
}
this._bufferedBytes -= n;
const bytes = this._remaining.slice(0, n);
this._remaining = this._remaining.slice(n);
return bytes;
}
processBuffers() {
if (this._processing) return;
if (this.debug)
console.log("\x1b[32m%s\x1b[0m", "::processing frames::");
let bytes;
while (true) {
///////////////////////
// Get Frame Headers //
///////////////////////
// const frame = getFrameInfo(chunk.slice(offset, offset + 2));
// NOTE: FIN because it is part of headers
if (!this._frame.hasOwnProperty("FIN")) {
if (!(bytes = this.getBytes(2))) break;
this._frame = getFrameInfo(bytes);
if (this.debug) console.log("HEAD", this._frame);
// Control Frames have PAYLOAD_LENGTH = 0
if (this._frame.LEN === 0) {
this.handleFrame(this._frame);
this._frame = {}; // reset frame
continue;
}
}
//////////////////////////////
// Calculate Payload Length //
//////////////////////////////
if (!this._frame.hasOwnProperty("PAYLOAD_LENGTH")) {
if (this._frame.LEN <= 125) {
this._frame.PAYLOAD_LENGTH = this._frame.LEN;
} else if (this._frame.LEN === 126) {
if (!(bytes = this.getBytes(2))) break;
this._frame.PAYLOAD_LENGTH = (bytes[0] << 8) | bytes[1];
} else {
// prettier-ignore
// read the next 64 bits (8bytes) as an unsigned int
// payloadLength =
// (chunk[offset + 2] << 56) | (chunk[offset + 3] << 48) |
// (chunk[offset + 4] << 40) | (chunk[offset + 5] << 32) |
// (chunk[offset + 6] << 24) | (chunk[offset + 7] << 16) |
// (chunk[offset + 8] << 8) | chunk[offset + 9];
// payloadOffset += 8;
}
if (this.debug) console.log("LEN", this._frame);
}
//////////////////////////////////////////
// Add Payload information and dispatch //
//////////////////////////////////////////
if (!this._frame.hasOwnProperty("PAYLOAD")) {
if (!(bytes = this.getBytes(this._frame.PAYLOAD_LENGTH))) break;
this._frame.PAYLOAD = bytes;
this.handleFrame(this._frame);
this._frame = {}; // reset frame
}
}
this._processing = false;
}
handleFrame(frame) {
if (this.debug) console.log("\x1b[34m%s\x1b[0m", "::FRAME::", frame);
// if (!frame.FIN) this.finBytes += frame.PAYLOAD_LENGTH;
// console.log("::finbytes::", this.finBytes);
switch (frame.OPCODE) {
// NOTE: StringDecoder will be useful in Continue
case OPCODE.TEXT:
this.handleTextData(frame.PAYLOAD.toString());
break;
case OPCODE.CLOSE:
this.handleClose();
break;
case OPCODE.PING:
console.log("::PING::");
// TODO: https://tools.ietf.org/html/rfc6455#section-5.5.2
break;
}
}
handleTextData(text) {
if (this.debug)
console.log("\x1b[33m%s\x1b[0m", "::emit message::", text);
this.emit("message", text);
}
handleClose() {
// For now lets close the connection by sending
// <Buffer 0b10001000 0b00000000>
// ^^^^
if (this.debug) console.log("\x1b[31m%s\x1b[0m", "::closing:: 0x8");
this.SOCKET_STATE = "CLOSING";
this._socket.write(Buffer.from([0b10001000, 0b00000000], 2));
}
/**
* @param {String} data
* @todo add support for binary data
*/
send(data) {
if (this.SOCKET_STATE !== "OPEN")
throw new Error("Websocket needs to be open.");
if (typeof data !== "string")
throw new TypeError("data must be string");
this._socket.write(encodeFrame(data, OPCODE.TEXT));
}
close() {
this.handleClose();
}
}
module.exports = WebSocket;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.