Created
March 12, 2017 20:50
-
-
Save dshvedchenko/5883fb93459f3c82ce33b73381f8fdaf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// @flow | |
import { Readable } from 'stream'; | |
import net from 'net'; | |
export default class RequestStream extends Readable { | |
socket: net.Socket; | |
buffer: Buffer; | |
httpTerm: Buffer; | |
headers: {}; | |
headersReady: boolean; | |
method: string; | |
proto: string; | |
url: string; | |
chunkNum: number; | |
chunks: Array < Buffer > ; | |
headersHandlers: {}; | |
contentLength: number; | |
bytesRead: number; | |
constructor(socket: net.Socket, options: ? Object) { | |
if (options !== null) { super(options); } else { super() }; | |
this.headersHandlers = { | |
'content-length': this.setContentLength.bind(this), | |
} | |
this.bytesRead = 0; | |
this.chunks = []; | |
this.chunkNum = 0; | |
this.buffer = Buffer.alloc(0); | |
this.httpTerm = Buffer.from('\r\n\r\n'); | |
this.headersReady = false; | |
this.socket = socket; | |
socket.on('data', (data: Buffer) => { | |
if (!this.headersReady) { | |
this.buffer = Buffer.concat([this.buffer, data]); | |
const termPos = this.buffer.indexOf(this.httpTerm); | |
if (termPos > 0) { | |
this.headersReady = true; | |
this.socket.pause(); | |
const headerBuff = this.buffer.slice(0, termPos); | |
const tBuf: Buffer = this.buffer.slice(termPos + 4); | |
this.parseHeader(headerBuff); | |
// this.socket.unshift(tBuf); | |
this.chunks.push(tBuf); | |
this.emit('headers', this); | |
} | |
} else { | |
this.chunks.push(data); | |
this.socket.pause(); | |
} | |
}); | |
socket.on('end', () => this.push(null)); | |
socket.on('error', (err: Error) => { | |
this.emit('error', err); | |
}) | |
} | |
_read() { | |
if (this.chunks.length > 0) { | |
this.push(Buffer.concat(this.chunks)); | |
this.chunks = []; | |
} | |
this.socket.resume(); | |
} | |
push(data: any): boolean { | |
let res = false; | |
if (data !== null) { | |
this.bytesRead += data.length; | |
res = super.push(data); | |
if (res && this.bytesRead >= this.contentLength) res = super.push(null); | |
} else { | |
return super.push(null); | |
} | |
return res; | |
} | |
setContentLength(len: string) { | |
this.contentLength = +len; | |
} | |
parseHeader(data: Buffer): void { | |
let headerBlock: Array < string > = data.toString('utf-8').split('\r\n'); | |
const [method, url, proto] = headerBlock[0].split(' '); | |
headerBlock = headerBlock.splice(1); | |
const headers = headerBlock.filter((line) => line.length > 0).map((line) => { | |
const ix = line.indexOf(': '); | |
const res = { | |
name: line.substring(0, ix), | |
value: line.substring(ix + 2) | |
}; | |
const customHandler = this.headersHandlers[res.name.toLowerCase()]; | |
if (customHandler !== undefined) customHandler(res.value); | |
return res; | |
}); | |
this.method = method; | |
this.url = url; | |
this.proto = proto; | |
this.headers = headers.reduce((p, header) => { | |
const res = p; | |
res[header.name] = header.value; | |
return p; | |
}, {}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment