Skip to content

Instantly share code, notes, and snippets.

@jcoglan
Last active October 11, 2015 21:54
Show Gist options
  • Save jcoglan/a1b1be5b82b2122a1df5 to your computer and use it in GitHub Desktop.
Save jcoglan/a1b1be5b82b2122a1df5 to your computer and use it in GitHub Desktop.
'use strict';
var crypto = require('crypto'),
http = require('http'),
net = require('net'),
protocol = require('./protocol');
var httpServer = http.createServer();
httpServer.on('upgrade', function(request, socket, body) {
socket.on('data', function(chunk) {
console.log('[http input] ', chunk);
});
var encode = new protocol.Encode(), decode = new protocol.Decode();
decode.on('data', function(message) {
console.log('[http message] ', message.length);
});
decode.pipe(encode).pipe(socket);
decode.write(body);
socket.pipe(decode);
});
httpServer.listen(4007);
function testServer(port) {
var socket = net.connect(port, 'localhost'),
messageSize = Math.pow(2, 14);
socket.write('GET / HTTP/1.1\r\n');
socket.write('Connection: Upgrade\r\n');
socket.write('Upgrade: echo\r\n');
socket.write('\r\n');
var encode = new protocol.Encode(), decode = new protocol.Decode();
decode.on('data', function(message) {
console.log('[client message] ', message.length);
encode.write(crypto.randomBytes(messageSize));
});
socket.pipe(decode);
encode.pipe(socket);
var buf = crypto.randomBytes(messageSize);
encode.write(buf);
}
testServer(4007);
'use strict';
var crypto = require('crypto'),
net = require('net'),
protocol = require('./protocol');
var tcpServer = net.createServer(function(socket) {
socket.on('data', function(chunk) {
console.log('[tcp input] ', chunk);
});
var encode = new protocol.Encode(), decode = new protocol.Decode();
decode.on('data', function(message) {
console.log('[tcp message] ', message.length);
});
decode.pipe(encode).pipe(socket);
socket.pipe(decode);
});
tcpServer.listen(4007);
function testServer(port) {
var socket = net.connect(port, 'localhost'),
messageSize = Math.pow(2, 14);
var encode = new protocol.Encode(), decode = new protocol.Decode();
decode.on('data', function(message) {
console.log('[client message] ', message.length);
encode.write(crypto.randomBytes(messageSize));
});
socket.pipe(decode);
encode.pipe(socket);
var buf = crypto.randomBytes(messageSize);
encode.write(buf);
}
testServer(4007);
'use strict';
var Stream = require('stream').Stream,
util = require('util');
var Base = function() {
Stream.call(this);
this.readable = this.writable = true;
this._paused = false;
};
util.inherits(Base, Stream);
Base.prototype.pause = function() {
this._paused = true;
};
Base.prototype.resume = function() {
this._paused = false;
this.emit('drain');
};
var Decode = function() {
Base.call(this);
this._stage = 'parselength';
this._length = null;
this._queue = [];
this._amount = 0;
};
util.inherits(Decode, Base);
Decode.prototype.write = function(chunk) {
if (!this.writable) return false;
this._queue.push(chunk);
this._amount += chunk.length;
var buf;
while (true) {
if (this._stage === 'parselength') {
buf = this._readBytes(4);
if (!buf) break;
this._length = buf.readUInt32BE(0);
this._stage = 'readbody';
}
if (this._stage === 'readbody') {
buf = this._readBytes(this._length);
if (!buf) break;
this._stage = 'parselength';
this.emit('data', buf);
}
}
return !this._paused;
};
Decode.prototype._readBytes = function(size) {
if (size > this._amount) return null;
var chunks = [],
amount = 0,
chunk;
while (amount < size && this._queue.length > 0) {
chunk = this._queue.shift();
this._amount -= chunk.length;
chunks.push(chunk);
amount += chunk.length;
}
var buffer = Buffer.concat(chunks, amount),
output = buffer.slice(0, size),
remain = buffer.slice(size);
if (remain.length > 0) {
this._queue.unshift(remain);
this._amount += remain.length;
}
return output;
};
var Encode = function() {
Base.call(this);
};
util.inherits(Encode, Base);
Encode.prototype.write = function(chunk) {
if (!this.writable) return false;
var lengthHeader = new Buffer(4);
lengthHeader.writeUInt32BE(chunk.length, 0);
this.emit('data', lengthHeader);
this.emit('data', chunk);
return !this._paused;
};
module.exports = {
Decode: Decode,
Encode: Encode
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment