Skip to content

Instantly share code, notes, and snippets.

@dshvedchenko
Created March 12, 2017 20:50
Show Gist options
  • Save dshvedchenko/5883fb93459f3c82ce33b73381f8fdaf to your computer and use it in GitHub Desktop.
Save dshvedchenko/5883fb93459f3c82ce33b73381f8fdaf to your computer and use it in GitHub Desktop.
// @flow
import { Readable } from 'stream';
import net from 'net';
export default class RequestStream extends Readable {
socket: net.Socket;
buffer: Buffer;
httpTerm: Buffer;
headers: {};
headersReady: boolean;
method: string;
proto: string;
url: string;
chunkNum: number;
chunks: Array < Buffer > ;
headersHandlers: {};
contentLength: number;
bytesRead: number;
constructor(socket: net.Socket, options: ? Object) {
if (options !== null) { super(options); } else { super() };
this.headersHandlers = {
'content-length': this.setContentLength.bind(this),
}
this.bytesRead = 0;
this.chunks = [];
this.chunkNum = 0;
this.buffer = Buffer.alloc(0);
this.httpTerm = Buffer.from('\r\n\r\n');
this.headersReady = false;
this.socket = socket;
socket.on('data', (data: Buffer) => {
if (!this.headersReady) {
this.buffer = Buffer.concat([this.buffer, data]);
const termPos = this.buffer.indexOf(this.httpTerm);
if (termPos > 0) {
this.headersReady = true;
this.socket.pause();
const headerBuff = this.buffer.slice(0, termPos);
const tBuf: Buffer = this.buffer.slice(termPos + 4);
this.parseHeader(headerBuff);
// this.socket.unshift(tBuf);
this.chunks.push(tBuf);
this.emit('headers', this);
}
} else {
this.chunks.push(data);
this.socket.pause();
}
});
socket.on('end', () => this.push(null));
socket.on('error', (err: Error) => {
this.emit('error', err);
})
}
_read() {
if (this.chunks.length > 0) {
this.push(Buffer.concat(this.chunks));
this.chunks = [];
}
this.socket.resume();
}
push(data: any): boolean {
let res = false;
if (data !== null) {
this.bytesRead += data.length;
res = super.push(data);
if (res && this.bytesRead >= this.contentLength) res = super.push(null);
} else {
return super.push(null);
}
return res;
}
setContentLength(len: string) {
this.contentLength = +len;
}
parseHeader(data: Buffer): void {
let headerBlock: Array < string > = data.toString('utf-8').split('\r\n');
const [method, url, proto] = headerBlock[0].split(' ');
headerBlock = headerBlock.splice(1);
const headers = headerBlock.filter((line) => line.length > 0).map((line) => {
const ix = line.indexOf(': ');
const res = {
name: line.substring(0, ix),
value: line.substring(ix + 2)
};
const customHandler = this.headersHandlers[res.name.toLowerCase()];
if (customHandler !== undefined) customHandler(res.value);
return res;
});
this.method = method;
this.url = url;
this.proto = proto;
this.headers = headers.reduce((p, header) => {
const res = p;
res[header.name] = header.value;
return p;
}, {});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment