Skip to content

Instantly share code, notes, and snippets.

@garthk
Created August 4, 2012 02:02
Show Gist options
  • Save garthk/3253513 to your computer and use it in GitHub Desktop.
Save garthk/3253513 to your computer and use it in GitHub Desktop.
Using bouncy and mux-demux to make a proxy server
var http = require('http'),
net = require('net'),
bouncy = require('bouncy'),
MuxDemux = require('mux-demux');
var mux_server_port = 8192;
var trampoline_port = 8193;
var target_server_port = 8194;
var mux;
function log(arg) {
console.log(JSON.stringify(arg));
}
// mux_server takes connections from mux_client via trampoline
var mux_server = http.createServer(function(req, res) {
log({ what: 'mux-request' });
res.writeHead(403);
res.end("not expecting requests");
});
mux_server.listen(mux_server_port);
mux_server.on('connect', function(req, socket, head) {
log({ what: 'mux-connect' });
socket.write('HTTP/1.1 200 Connection Established\r\n\r\n');
mux = MuxDemux();
socket.pipe(mux).pipe(socket);
});
// trampoline takes connections from either mux_client or HTTP
// client, redirecting to mux_server on CONNECT or mux_client
// (via new mux stream) on other methods e.g. GET.
var trampoline = bouncy(function(req, bounce) {
log({ what: 'trampoline-request', method: req.method, path: req.url });
if (req.method == 'CONNECT') {
log({ what: 'trampoline-redirect-to-mux-server' });
bounce(mux_server_port);
} else {
log({ what: 'trampoline-redirect-via-mux-server' });
var meta = { method: req.method, url: req.url, headers: req.headers };
var stream = mux.createStream(meta);
bounce(stream);
}
});
trampoline.listen(trampoline_port);
// I'd hoped that by setting the socket's encoding, the data events
// from the bounced stream would contain strings instead of Buffers
// and avoid the problem in mux-demux. No such luck:
trampoline.on('connect', function(socket) {
socket.setEncoding('utf8');
});
// target_server will be chosen by mux_client based on the
// request metadata bouncy supplied to the trampoline. For
// this example, we'll create a server ourselves.
var target_server = http.createServer(function(req, res) {
log({ what: 'target-request' });
res.writeHead(200);
res.end("BURMA!"); // compulsory Python reference
});
target_server.listen(target_server_port);
var mux_client_opts = {
host: 'localhost',
port: trampoline_port,
method: 'CONNECT',
path: 'compulsory?'
};
// mux_client routes the original HTTP request to a target server.
var mux_client = http.request(mux_client_opts);
mux_client.on('connect', function(res, socket, head) {
log({ what: 'mux-client-connected' });
var mux = MuxDemux();
socket.pipe(mux).pipe(socket);
mux.on('connection', function(stream) {
log({ what: 'new-mux-channel', meta: stream.meta });
var sock_opts = { host: 'localhost', port: target_server_port };
var sock = net.connect(sock_opts, function() {
log({ what: 'mux-client-connected-to-target' });
sock.pipe(stream).pipe(sock);
});
});
// Just for this example, we'll have mux_client make a request
// to the trampoline to exercise the whole chain once we establish
// the mux-client-to-mux-server connection.
log({ what: 'client-making-request-to-trampoline' });
var opts = {
host: 'localhost',
port: trampoline_port,
method: 'GET',
path: '/fnord' // compulsory Illuminati reference
};
var client_request = http.request(opts, function(res) {
log({ what: 'client-got-results-via-trampoline',
status: res.statusCode });
res.setEncoding('utf8');
var content = "";
res.on('data', function(chunk) { content += chunk; });
res.on('end', function() {
log({ what: 'client-finished-getting-results',
content: content });
});
});
client_request.end();
});
log({ what: 'mux-client-opens-connection-to-mux-server-via-trampoline' });
mux_client.end(); // kicks it all off
{"what":"mux-client-opens-connection-to-mux-server-via-trampoline"}
{"what":"trampoline-request","method":"CONNECT","path":"compulsory?"}
{"what":"trampoline-redirect-to-mux-server"}
{"what":"mux-connect"}
{"what":"mux-client-connected"}
{"what":"client-making-request-to-trampoline"}
{"what":"trampoline-request","method":"GET","path":"/fnord"}
{"what":"trampoline-redirect-via-mux-server"}
{"what":"new-mux-channel","meta":{"method":"GET","url":"/fnord","headers":{"host":"localhost:8193","connection":"keep-alive"}}}
{"what":"mux-client-connected-to-target"}
net.js:503
throw new TypeError('First argument must be a buffer or a string.');
^
TypeError: First argument must be a buffer or a string.
at Socket.write (net.js:503:11)
at Stream.ondata (stream.js:38:26)
at Stream.EventEmitter.emit (events.js:88:17)
at Stream.<anonymous> (/Users/garth/hg/asupnode/node_modules/mux-demux/index.js:30:14)
at Stream.stream.write (/Users/garth/hg/asupnode/node_modules/mux-demux/node_modules/event-stream/node_modules/through/index.js:22:11)
at Stream.ondata (stream.js:38:26)
at Stream.EventEmitter.emit (events.js:88:17)
at Stream.es.parse (/Users/garth/hg/asupnode/node_modules/mux-demux/node_modules/event-stream/index.js:497:12)
at Stream.stream.write (/Users/garth/hg/asupnode/node_modules/mux-demux/node_modules/event-stream/node_modules/through/index.js:22:11)
at Stream.ondata (stream.js:38:26)
var net = require('net'),
MuxDemux = require('mux-demux'),
util = require('util');
var destPort = 8192;
var dest = net.createServer(function(conn) {
console.log("destination server got connection");
conn.write(new Buffer("fnord", 'utf8'));
console.log("wrote data");
conn.end();
console.log("called stream.end");
console.log("mystery: why does it take so long?");
}).listen(destPort, function() {
console.log("server bound");
});
var serverMux = MuxDemux();
var clientMux = MuxDemux();
clientMux.pipe(serverMux).pipe(clientMux);
serverMux.on('connection', function(stream) {
console.log("mux got connection");
var conn_to_port = net.connect(destPort);
conn_to_port.on('connect', function() {
stream.pipe(conn_to_port).pipe(stream);
});
});
console.log("opening read stream...");
clientMux.createReadStream('test').on('data', function(data) {
if (Buffer.isBuffer(data)) {
console.log("received buffer containing:", data.toString());
} else if (typeof data === 'string') {
console.log("received string:", data);
} else if (typeof data === 'object') {
console.log(util.format(
"received non-buffer (%s) with keys: %s",
typeof data,
Object.keys(data)));
} else {
console.log("received unexpected", typeof data);
}
}).on('end', process.exit);
diff --git a/index.js b/index.js
index 2e0d007..422dd0f 100644
--- a/index.js
+++ b/index.js
@@ -26,9 +26,10 @@ function MuxDemux (opts) {
s.paused = false
if(p) s.emit('drain')
}
- else {
+ else if (event === 'bufdata')
+ s.emit('data', new Buffer(data[1], 'utf8'));
+ else
s.emit.apply(s, data)
- }
})
function destroyAll (_err) {
@@ -59,7 +60,10 @@ function MuxDemux (opts) {
var s = es.through(function (data) {
if(!this.writable)
throw new Error('stream is not writable')
- md.emit('data', [s.id, 'data', data])
+ if (Buffer.isBuffer(data))
+ md.emit('data', [s.id, 'bufdata', data.toString('utf8')])
+ else
+ md.emit('data', [s.id, 'data', data])
}, function () {
md.emit('data', [s.id, 'end'])
if (this.readable && !opts.allowHalfOpen && !this.ended) {
@garthk
Copy link
Author

garthk commented Aug 4, 2012

Let's say I have a web service on target, and a request made from client, which can't reach target-server. To broker the connection, I have nodes next to both the target and the client. The situation is: I can only open connections in one direction, from target-neighbour to client-neighbour, and only on a single port.

So, I deploy mux_client on target-neighbour. It connects to mux_server running on client-neighbour via the trampoline service running on the only port we can use. When client hits the trampoline port, bouncy opens a new stream via mux-demux and throws the request down it. When mux_client sees the new stream, it opens a connection to target and throws the stream at it. In theory, our HTTP connection just got through.

In practice, I get an exception thrown in net.js:503 because we're trying to call Socket.write with a JavaScript object rather than a Buffer or string. This, in turn, seems to be because bouncy is passing buffers to mux-demux. I've tried various places to setEncoding in the hope that I'll get strings instead of buffers. So far, I haven't succeeded.

@garthk
Copy link
Author

garthk commented Aug 4, 2012

mux_demux_buffer_demo.js shows mux-demux passing buffers to the other end as objects. I modified mux-demux to encode and decode buffers at either end. That satisfies the demo script, but not the full bouncy-based script above, which now hangs.

@garthk
Copy link
Author

garthk commented Aug 4, 2012

Aha! I wasn't calling stream.pause before opening the socket to the destination server. When I do, however, I see this:

{"what":"trampoline-request","method":"[\"F21ECB9808ACF461\",\"PAUSE\"]","path":""}

… at which point I get an entirely appropriate crash. How does pausing the mux stream cause that request to be made?

@garthk
Copy link
Author

garthk commented Aug 7, 2012

I'm getting error fired by req after that request hits my handler, but the first argument is an empty object.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment