Skip to content

Instantly share code, notes, and snippets.

@isaacs
Created December 12, 2012 09:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save isaacs/4266289 to your computer and use it in GitHub Desktop.
Save isaacs/4266289 to your computer and use it in GitHub Desktop.
/* does effectively this, but without all that fancy
* stream mumbo jumbo:
var net = require('net');
var server = net.createServer(function(socket) {
socket.pipe(socket);
});
server.listen(PORT, function() {
var client = net.connect(PORT, function() {
client.on('data', function(chunk) {
console.log('CLIENT %d', chunk.length);
});
var n = 16;
client.write(new Buffer(bufSize), function w() {
if (--n > 0)
client.write(new Buffer(bufSize), w);
else
server.close();
});
});
});
*/
process.on('uncaughtException', function(e) {
console.error(e.stack || e.message);
console.error('errno = %j', global.errno);
process.exit(global.errno || 1);
});
var PORT = 1337;
var TCP = process.binding('tcp_wrap').TCP;
var writeWaterMark = 1024 * 16;
var bufSize = 1024 * 1024;
var bufCount = 16;
var totalBuf = bufSize * bufCount;
//
// SERVER
//
var serverHandle = new TCP();
// server.listen(PORT)
var r = serverHandle.bind('127.0.0.1', PORT);
if (r) {
serverHandle.close();
throw new Error('bind');
}
serverHandle.onconnection = onconnection;
var r = serverHandle.listen(511);
if (r) {
serverHandle.close();
throw new Error('listen');
}
console.log('listening');
function onconnection(clientHandle) {
console.error('onconnection');
if (!clientHandle)
throw new Error('connect');
clientHandle.server = true;
// here goes the actual connection handler stuff
connectionHandler(clientHandle);
}
function connectionHandler(clientHandle, fn) {
clientHandle.onread = onread(selfPipe)
clientHandle.pendingWrites = 0;
clientHandle.readStart();
}
// socket.pipe(socket);
function selfPipe(handle, chunk) {
var writeReq = handle.writeBuffer(chunk);
if (!writeReq || typeof writeReq !== 'object')
throw new Error('write');
writeReq.oncomplete = afterWrite(selfPipeAfterWrite);
handle.pendingWrites++;
if (handle.writeQueueSize >= writeWaterMark)
handle.readStop();
}
function selfPipeAfterWrite(handle, req) {
if (handle.writeQueueSize < writeWaterMark)
handle.readStart();
}
function onread(fn) { return function(buffer, offset, length) {
var handle = this;
if (buffer) {
if (length === 0)
return;
var d = handle.server ? 'SERVER' : 'CLIENT';
if (!handle.totalRead)
handle.totalRead = 0;
handle.totalRead += length;
console.error('%s ondata %j %j', d, length, handle.totalRead, totalBuf);
var chunk = buffer.slice(offset, offset + length);
fn(handle, chunk);
} else if (errno === 'EOF') {
console.error('%s EOF', handle.server ? 'SERVER' : 'CLIENT');
destroySoon(handle);
} else if (errno === 'ECONNRESET') {
console.error('%s ECONNRESET', handle.server ? 'SERVER' : 'CLIENT');
destroy(handle);
} else {
throw new Error('read');
}
}}
function afterWrite(fn) { return function(status, handle, req) {
if (status)
throw new Error('write');
handle.pendingWrites--;
if (handle.shutdowning)
return shutdownSoon(handle);
if (handle.destroying)
return destroySoon(handle);
// done with the write, start reading again.
fn(handle, req);
}}
function destroySoon(handle) {
handle.destroying = true;
if (handle.pendingWrites === 0)
return destroy(handle);
}
function destroy(handle) {
handle.close();
handle.onread = function() {};
if (handle.onclose)
handle.onclose();
}
var net = require('net');
var timers = require('timers');
var client = net.connect(PORT, function() {
var shutdown = client._handle.shutdown;
client._handle.shutdown = function() {
console.trace('shutdown');
return shutdown.apply(this, arguments);
};
var readStart = client._handle.readStart;
client._handle.readStart = function() {
console.error('readStart');
return readStart.apply(this, arguments);
};
var readStop = client._handle.readStop;
client._handle.readStop = function() {
console.trace('readStop');
return readStop.apply(this, arguments);
};
var close = client._handle.close;
client._handle.close = function() {
console.trace('close');
return close.apply(this, arguments);
};
var totalRead = 0;
client.on('data', function(chunk) {
totalRead += chunk.length;
console.error('CLIENT ondata %j %j',
chunk.length, totalRead, totalBuf);
});
client.on('end', function() {
console.error('CLIENT EOF');
serverHandle.close();
});
client.on('_socketEnd', function() {
console.error('CLIENT socketEnd');
console.error(client._handle);
});
client.on('close', function() {
var h = process._getActiveHandles();
var r = process._getActiveRequests();
console.error('activeHandles', h.length);
console.error('activeReqs', r.length, r);
h.forEach(function(h, i) {
console.error(i, h === client, h === serverHandle);
});
});
var n = 16;
client.write(new Buffer(bufSize), function w() {
if (--n > 0)
client.write(new Buffer(bufSize), w);
else
client.end();
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment