Skip to content

Instantly share code, notes, and snippets.

@theicfire
Last active April 15, 2024 15:37
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save theicfire/b6178106786f1e49a9357110c0b4db1d to your computer and use it in GitHub Desktop.
Save theicfire/b6178106786f1e49a9357110c0b4db1d to your computer and use it in GitHub Desktop.
const { Socket } = require('net');
const { Duplex } = require('stream');
class JsonSocket extends Duplex {
/**
JsonSocket implements a basic wire-protocol that encodes/decodes
JavaScripts objects as JSON strings over the wire. The wire protocol
is defined as:
4 len - length of JSON body
len body - the JSON body encoded with minimal whitespacing
JsonSocket operates in object mode where calls to `read` and `write`
operate on JavaScript objects instead of Buffers.
@param {Socket} socket
*/
constructor(socket) {
super({ objectMode: true });
/**
True when read buffer is full and calls to `push` return false.
Additionally data will not be read off the socket until the user
calls `read`.
@private
@type {boolean}
*/
this._readingPaused = false;
/**
The underlying TCP Socket
@private
@type {Socket}
*/
this._socket;
this._currentReadingLen = 0;
this._currentBuffers = [];
// wrap the socket
if (socket) this._wrapSocket(socket);
}
bytesWaiting() {
let bufferSum = 0;
for (let i = 0; i < this._currentBuffers.length; i++) {
bufferSum += this._currentBuffers[i].length;
}
return this._currentReadingLen - bufferSum; // TODO what about negative?
}
/**
Connect to a JsonSocket server.
@param {object} param
@param {string} [param.host] the host to connect to. Default is localhost
@param {number} param.port the port to connect to. Required.
@return {JsonSocket}
*/
connect({ host, port }) {
this._wrapSocket(new Socket());
this._socket.connect('/tmp/fun.sock');
return this;
}
/**
Wraps a standard TCP Socket by binding to all events and either
rebroadcasting those events or performing custom functionality.
@private
@param {Socket} socket
*/
_wrapSocket(socket) {
this._socket = socket;
this._socket.on('close', hadError => this.emit('close', hadError));
this._socket.on('connect', () => this.emit('connect'));
this._socket.on('drain', () => this.emit('drain'));
this._socket.on('end', () => this.emit('end'));
this._socket.on('error', err => this.emit('error', err));
this._socket.on('lookup', (err, address, family, host) => this.emit('lookup', err, address, family, host)); // prettier-ignore
this._socket.on('ready', () => this.emit('ready'));
this._socket.on('timeout', () => this.emit('timeout'));
this._socket.on('readable', this._onReadable.bind(this));
}
/**
Performs data read events which are triggered under two conditions:
1. underlying `readable` events emitted when there is new data
available on the socket
2. the consumer requested additional data
@private
*/
_onReadable() {
// Read all the data until one of two conditions is met
// 1. there is nothing left to read on the socket
// 2. reading is paused because the consumer is slow
while (!this._readingPaused) {
let numBytesWaiting = this.bytesWaiting();
if (numBytesWaiting === 0) {
// First step is reading the 32-bit integer from the socket
// and if there is not a value, we simply abort processing
let lenBuf = this._socket.read(4);
if (!lenBuf) return;
// Now that we have a length buffer we can convert it
// into a number by reading the UInt32BE value
// from the buffer.
let len = lenBuf.readUInt32BE();
this._currentReadingLen = len;
numBytesWaiting = this.bytesWaiting();
}
const readableLen = this._socket.readableLength;
if (readableLen == 0) {
return;
}
if (readableLen < numBytesWaiting) {
this._currentBuffers.push(this._socket.read(readableLen));
return;
}
this._currentBuffers.push(this._socket.read(numBytesWaiting));
const body = Buffer.concat(this._currentBuffers);
this._currentReadingLen = 0;
this._currentBuffers = [];
// Try to parse the data and if it fails destroy the socket.
let json;
try {
json = JSON.parse(body);
} catch (ex) {
this._socket.destroy(ex);
return;
}
// Push the data into the read buffer and capture whether
// we are hitting the back pressure limits
let pushOk = this.push(json);
// When the push fails, we need to pause the ability to read
// messages because the consumer is getting backed up.
if (!pushOk) this._readingPaused = true;
}
}
/**
Implements the readable stream method `_read`. This method will
flagged that reading is no longer paused since this method should
only be called by a consumer reading data.
@private
*/
_read() {
this._readingPaused = false;
setImmediate(this._onReadable.bind(this));
}
/**
Implements the writeable stream method `_write` by serializing
the object and pushing the data to the underlying socket.
*/
_write(obj, encoding, cb) {
let json = JSON.stringify(obj);
let jsonBytes = Buffer.byteLength(json);
let buffer = Buffer.alloc(4 + jsonBytes);
buffer.writeUInt32BE(jsonBytes);
buffer.write(json, 4);
this._socket.write(buffer, cb);
}
/**
Implements the writeable stream method `_final` used when
.end() is called to write the final data to the stream.
*/
_final(cb) {
this._socket.end(cb);
}
}
module.exports = JsonSocket;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment