Skip to content

Instantly share code, notes, and snippets.

@billywhizz
Created February 2, 2011 17:49
Show Gist options
  • Save billywhizz/808061 to your computer and use it in GitHub Desktop.
Save billywhizz/808061 to your computer and use it in GitHub Desktop.
stream client and server for node.js
var net = require("net");
var crypto = require("crypto");
var fs = require("fs");
var multi = null;
var _states = {
"open": "open",
"closed": "closed",
"readOnly": "readOnly",
"writeOnly": "writeOnly",
"opening": "opening",
}
var _server_states = {
"listening": "listening",
"paused": "paused"
}
function writesocket(connection, data, encoding) {
try {
return(connection.socket.write(data, encoding) > 0);
}
catch(ex) {
console.log(JSON.stringify(ex, null, "\t"));
return false;
}
}
function callchain(queue, thisobj, args) {
if(queue && queue.length) {
queue.forEach(function(foo) {
foo.apply(thisobj, args);
});
}
}
function Client() {
var _self = this;
_self.socket = new net.Stream();
if(!_self.type) {
_self.type = "inet";
}
_self.socket.setNoDelay(_self.nodelay);
_self.socket.setTimeout(_self.timeout);
var connection = {
"socket": _self.socket,
"queue": [],
"bytesread": 0,
"byteswritten": 0
};
connection.write = function(data, encoding) {
if(!encoding) encoding = _self.encoding;
return writesocket(connection, data, encoding);
}
connection.reconnect = function() {
_self.socket.connect(_self.port, _self.host, _self.type);
}
if(_self.encoding && _self.encoding != "binary") {
_self.socket.setEncoding(_self.encoding);
_self.socket.addListener("data", function(data) {
callchain(_self.handlers["data"], connection, arguments);
connection.bytesread += data.length;
});
}
else {
_self.socket.ondata = function (buffer, start, end) {
callchain(_self.handlers["binary"], connection, arguments);
connection.bytesread += (end-start);
};
}
_self.socket.addListener("connect", function() {
if(_self.secure) {
if(!_self.credentials) {
_self.socket.setSecure();
}
else {
_self.socket.setSecure(crypto.createCredentials({"cert": fs.readFileSync(_self.credentials.cert),"key": fs.readFileSync(_self.credentials.key)}));
}
}
});
["connect", "end", "close", "timeout", "error", "secure", "drain"].forEach(function(event) {
_self.socket.addListener(event, function() {
if(_self.handlers[event]) callchain(_self.handlers[event], connection, arguments);
});
});
_self.socket.connect(_self.port, _self.host, _self.type);
return _self;
}
function Server() {
var _self = this;
if(_self.secure) {
var credentials = crypto.createCredentials({"cert": fs.readFileSync(_self.credentials.cert),"key": fs.readFileSync(_self.credentials.key)});
}
if(!_self.type) {
_self.type = "inet";
}
_self.server = net.createServer(function (socket) {
var connection = {
"socket": socket,
"queue": [],
"bytesread": 0,
"byteswritten": 0
};
connection.write = function(data, encoding) {
if(!encoding) encoding = _self.encoding;
return writesocket(connection, data, encoding);
}
socket.setTimeout(_self.timeout);
socket.setNoDelay(_self.nodelay);
if(_self.encoding && _self.encoding != "binary") {
socket.setEncoding(_self.encoding);
socket.addListener("data", function(data) {
callchain(_self.handlers["data"], connection, arguments);
connection.bytesread += data.length;
});
}
else {
socket.ondata = function (buffer, start, end) {
callchain(_self.handlers["binary"], connection, arguments);
connection.bytesread += (end-start);
};
}
if(_self.secure) {
socket.setSecure(credentials);
}
socket.addListener("timeout", function() {
if(_self.closeontimeout) socket.destroy();
});
["end", "timeout", "connect", "close", "secure", "error", "drain"].forEach(function(event) {
socket.addListener(event, function() {
callchain(_self.handlers[event], connection, arguments);
});
});
});
_self.server.addListener("listening", function() {
this.state = _server_states.listening;
callchain(_self.handlers["listen"], _self, arguments);
});
_self.server.addListener("close", function() {
this.state = _server_states.paused;
callchain(_self.handlers["stop"], _self, arguments);
});
_self.listen = function() {
if(_self.multi) {
if(!multi) multi = require("./multi-node");
var nodes = multi.listen({
"port": _self.port,
"nodes": _self.multi.nodes,
"masterListen": _self.multi.masterListen
}, _self.server);
nodes.addListener("node", function(stream) {
_self.server.nodestream = stream;
callchain(_self.handlers["node"], _self, arguments);
});
}
else {
_self.server.listen(_self.port, _self.host, _self.type);
}
}
return _self;
}
exports.client = Client;
exports.server = Server;
exports.streamStates = _states;
exports.serverStates = _server_states;
var streams = require("node-stream");
var maxconn = process.ARGV[2];
var connected = 0;
function tcpclient() {
return streams.client.call({
"secure": false,
"port": "/tmp/proxy.sock",
"host": "0.0.0.0",
"nodelay": true,
"timeout": 0,
"encoding": "utf8",
"handlers": {
"connect": [
function() {
connected++;
this.write("global\r\n");
}
],
"end": [
function() {
this.socket.end();
}
],
"timeout": [
function() {
this.socket.end();
}
],
"data": [
function(data) {
console.log(data.toString());
}
],
"drain": [
function() {
}
],
"error": [
function(exception) {
console.log(JSON.stringify(exception));
}
],
"binary": [
function(buffer, start, end) {
}
],
"secure": [
function() {
}
],
"close": [
function() {
}
]
}
});
}
var tcpc = tcpclient();
var net = require("net");
var streams = require("node-stream");
var repl = require("repl");
var tcpd = streams.server.call({
"secure": false,
"credentials": {
"cert": "./ssl/proxy1.cert.pem",
"key": "./ssl/proxy1.key.pem"
},
"port": "/tmp/proxy.sock",
"host": "0.0.0.0",
"nodelay": true,
"encoding": "utf8",
"timeout": 0,
"closeontimeout": true,
"handlers": {
"connect": [
function() {
console.log("connection");
var ctx = repl.start("$>", this.socket).context;
ctx.connection = this;
}
],
"end": [
function() {
console.log("close");
}
],
"error": [
function(exception) {
console.log(JSON.stringify(exception));
}
],
"timeout": [
function() {
}
],
"drain": [
function() {
}
],
"data": [
function(data) {
}
],
"binary": [
function(buffer, start, end) {
}
],
"secure": [
function() {
}
],
"close": [
function() {
this.socket.end();
}
]
}
});
tcpd.listen();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment