Skip to content

Instantly share code, notes, and snippets.

@estliberitas
Last active September 26, 2016 08:37
Show Gist options
  • Save estliberitas/6009d30c4a75a5c011a75da2d6896ee1 to your computer and use it in GitHub Desktop.
Save estliberitas/6009d30c4a75a5c011a75da2d6896ee1 to your computer and use it in GitHub Desktop.
Asynchronous parser
'use strict';
const fs = require('fs');
const random = (max) => Math.round(1 + Math.random() * (max - 1));
// Simple frame protocol:
// 4 bytes - frame length
// then read amount of bytes specified in frame length
const ws = fs.createWriteStream('parser.data');
const noOfFrames = random(100);
for (let i = 0; i < noOfFrames; i++) {
const frameLength = random(28);
const bufferSize = 4 + frameLength;
const buffer = new Buffer(bufferSize);
buffer.writeUInt32BE(frameLength);
buffer.fill(255, 4, bufferSize);
ws.write(buffer);
}
ws.end();
'use strict';
/**
* The protocol is simple:
*
* - there are frames of various length
* - first 4 bytes determine length of frame body
* - once frame length is determined, next N bytes are read as frame body
* - next frame starts right after current frame body reading is complete
*/
const EventEmitter = require('events');
const fs = require('fs');
const STATES = {
FRAME_START: 'FRAME_START',
FRAME_BODY: 'FRAME_BODY'
};
const FRAME_HEADER_SIZE = 4;
const HANDLERS = {
[STATES.FRAME_START]: (parser, buffer) => {
if (!parser.currentBuffer) {
parser.currentBuffer = new Buffer(FRAME_HEADER_SIZE);
parser.currentBufferUsed = 0;
}
const toRead = Math.min(FRAME_HEADER_SIZE - parser.currentBufferUsed, buffer.length);
buffer.copy(parser.currentBuffer, parser.currentBufferUsed, 0, toRead);
parser.currentBufferUsed += toRead;
if (parser.currentBufferUsed === FRAME_HEADER_SIZE) {
// going to the next state
parser.state = STATES.FRAME_BODY;
parser.currentFrameSize = parser.currentBuffer.readUInt32BE(0);
parser.resetCurrentBuffer();
}
return toRead;
},
[STATES.FRAME_BODY]: (parser, buffer) => {
if (!parser.currentBuffer) {
parser.currentBuffer = new Buffer(parser.currentFrameSize);
parser.currentBufferUsed = 0;
}
const toRead = Math.min(parser.currentFrameSize - parser.currentBufferUsed, buffer.length);
buffer.copy(parser.currentBuffer, parser.currentBufferUsed, 0, toRead);
parser.currentBufferUsed += toRead;
if (parser.currentBufferUsed === parser.currentFrameSize) {
parser.emit('block', parser.currentFrameSize, parser.currentBuffer);
parser.state = STATES.FRAME_START;
parser.resetCurrentBuffer();
}
return toRead;
}
};
class Parser extends EventEmitter {
constructor() {
super();
this.state = STATES.FRAME_START;
this.parsing = false;
// TODO replace with linked list
this.pendingBuffers = [];
this.resetCurrentBuffer();
}
parse() {
const handler = HANDLERS[this.state];
const buffer = this.pendingBuffers[0];
const haveRead = handler(this, buffer);
if (haveRead < buffer.length) {
this.pendingBuffers[0] = buffer.slice(haveRead);
}
else {
this.pendingBuffers.shift();
}
if (this.pendingBuffers.length) {
setImmediate(() => this.parse());
}
else {
this.parsing = false;
}
}
push(buffer) {
this.pendingBuffers.push(buffer);
if (!this.parsing) {
this.parsing = true;
this.parse();
}
}
resetCurrentBuffer() {
this.currentBuffer = null;
this.currentBufferUsed = 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment