Skip to content

Instantly share code, notes, and snippets.

@zhuzhonghua
Last active August 29, 2015 14:05
Show Gist options
  • Save zhuzhonghua/8265e1ecd8acf1570af5 to your computer and use it in GitHub Desktop.
Save zhuzhonghua/8265e1ecd8acf1570af5 to your computer and use it in GitHub Desktop.
socket nodejs
var net = require('net');
function ByteBuffer(para)
{
this.pos = 0;
// read
if(Buffer.isBuffer(para)){
this.buffer = para;
}
// write, len
else{
this.buffer = new Buffer(para);
}
this.toString = function(){
return this.buffer.toString('hex', this.pos);
};
this.readInt8 = function(){
var value = this.buffer.readInt8(this.pos);
this.pos += 1;
return value;
}
this.readUInt8 = function(){
var value = this.buffer.readUInt8(this.pos);
this.pos += 1;
return value;
}
this.readInt16 = function(){
var value = this.buffer.readInt16LE(this.pos);
this.pos += 2;
return value;
}
this.readUInt16 = function(){
var value = this.buffer.readUInt16LE(this.pos);
this.pos += 2;
return value;
}
this.readInt32 = function(){
var value = this.buffer.readInt32LE(this.pos);
this.pos += 4;
return value;
}
this.readUInt32 = function(){
var value = this.buffer.readUInt32LE(this.pos);
this.pos += 4;
return value;
}
this.readFloat = function(){
var value = this.buffer.readFloatLE(this.pos);
this.pos += 4;
return value;
}
this.readStr = function(len){
if(len <= 0){
return '';
}
else if(this.buffer.length - this.pos >= len){
var value = this.buffer.toString('utf8', this.pos, this.pos+len);
this.pos += len;
return value;
}
else{
return null;
}
}
///////////////////
this.writeInt8 = function(value){
this.buffer.writeInt8(value, this.pos);
this.pos += 1;
}
this.writeUInt8 = function(value){
this.buffer.writeUInt8(value, this.pos);
this.pos += 1;
}
this.writeInt16 = function(value){
this.buffer.writeInt16LE(value, this.pos);
this.pos += 2;
}
this.writeUInt16 = function(value){
this.buffer.writeUInt16LE(value, this.pos);
this.pos += 2;
}
this.writeInt32 = function(value){
this.buffer.writeInt32LE(value, this.pos);
this.pos += 4;
}
this.writeUInt32 = function(value){
this.buffer.writeUInt32LE(value, this.pos);
this.pos += 4;
}
this.writeFloat = function(value){
this.buffer.writeFloatLE(value, this.pos);
this.pos += 4;
}
this.writeStr = function(value){
if(value.length <= 0){
return;
}
this.buffer.write(value, this.pos, value.length);
this.pos += value.length;
}
}
var myutil = require('./myutil');
var LOG = myutil.LOG;
var ERROR = myutil.ERROR;
var CRASH = myutil.CRASH;
var DEBUG_BYTE = myutil.DEBUG_BYTE;
function Socket(option)
{
this.option = option;
this.inData = new Array();
this.outData = new Array();
this.writeBuffer = new ByteBuffer(1024);
this.active = false;
this.container = null;
var self = this;
// if client, connect to a address
if(option.client){
this._sock = net.Socket();
this._sock.connect(option.port, option.host, function(){
LOG("connected to "+option.port+" "+option.host);
self.active = true;
option.connSuccCB(self);
});
this._sockID = option.host+'('+option.port+')';
}
else{
this._sock = option.sock;
this.active = true;
this._sockID = this._sock.remoteAddress+'('+this._sock.remotePort+')';
}
this.msgCB = option.msgCB;
this.errCB = option.errCB;
this.init();
LOG("new socket "+this._sockID);
}
Socket.prototype.close = function(){
this.active = false;
this.inData = null;
this.outData = null;
if(this._sock){
this._sock.destroy();
}
this._sock = null;
this._sockID = '';
this.account = '';
};
Socket.prototype.id = function(){
return this._sockID;
};
Socket.prototype.sockErr = function(){
ERROR("sockErr "+this.id());
this.active = false;
var self = this;
this.errCB(self);
self.close();
};
Socket.prototype.connect = function(ip, port){
var self = this;
if(this.active == true){
ERROR("already connected "+this.option.host+" "+this.option.port);
}
};
Socket.prototype.init = function(){
// data is a Buffer, not a string
var self = this;
self._sock.on('data', function (data){
LOG(self._sockID + ' data recv ' + data.length);
DEBUG_BYTE(data);
self.inData.push(data);
self.handleInData();
});
self._sock.on('close', function (data){
LOG(self._sockID + ' sock end');
//removeSockObj(sockObj);
if(self._sock){
self.sockErr();
}
});
self._sock.on('error', function (data){
LOG(self._sockID + ' sock error');
if(self._sock){
self.sockErr();
}
});
};
// handle msg
Socket.prototype.handleInData = function()
{
var self = this;
var sockObj = self;
if(self.active == false){
ERROR("self active false, enter handleInData");
return;
}
try{
//var inBufArray = self.inData;
if(false == self.checkLenData(6)){
return;
}
var header = self.peekLenData(6);
var msgID = header.readUInt16();
var msgSize = header.readUInt32();
if(false == self.checkLenData(msgSize+6)){
LOG(sockObj._sockID + ' not enough buffer msgsize='+msgSize+', id='+msgID);
return;
}
LOG(sockObj._sockID + ' get msg ' + msgID + ' size ' + msgSize);
self.skipLenData(DEFINE.MSG_HEADER_SIZE);
var msgBody = self.readLenData(msgSize);
if(false == self.msgCB(self, msgID, msgBody.buffer)){
ERROR('false!!! handle msg ' + msgID);
}
process.nextTick(function(){
self.handleInData();
});
}
catch(e){
self.sockErr();
CRASH(e.stack);
}
};
Socket.prototype.checkLenData = function (len)
{
var bufArray = this.inData;
for(var i = 0; i < bufArray.length; ++i){
var buf = bufArray[i];
// ok
if(buf.length >= len){
return true;
}
// len -= buf.length
else{
len -= buf.length;
}
}
return false;
};
// read and remove data from buffer array
Socket.prototype.readLenData = function(len)
{
var bufArray = this.inData;
var ret = new Buffer(len);
var offset = 0;
while(bufArray.length > 0){
var buf = bufArray[0];
if(buf.length == len){
buf.copy(ret, offset, 0, buf.length);
bufArray.shift();
return new ByteBuffer(ret);
}
else if(buf.length > len){
buf.copy(ret, offset, 0, len);
// slice source data
bufArray[0] = buf.slice(len);
return new ByteBuffer(ret);
}
else{
buf.copy(ret, offset, 0, buf.length);
offset += buf.length;
len -= buf.length;
bufArray.shift();
}
}
return new ByteBuffer(ret);
};
// do not remove data from buffer array
Socket.prototype.peekLenData = function(len)
{
var bufArray = this.inData;
var ret = new Buffer(len);
var offset = 0;
for(var i = 0; i < bufArray.length; ++i){
var buf = bufArray[i];
if(buf.length == len){
buf.copy(ret, offset, 0, buf.length);
return new ByteBuffer(ret);
}
else if(buf.length > len){
buf.copy(ret, offset, 0, len);
return new ByteBuffer(ret);
}
else{
buf.copy(ret, offset, 0, buf.length);
offset += buf.length;
len -= buf.length;
}
}
return new ByteBuffer(ret);
};
// read and remove data from buffer array
Socket.prototype.skipLenData = function(len)
{
var bufArray = this.inData;
while(bufArray.length > 0){
var buf = bufArray[0];
if(buf.length == len){
bufArray.shift();
return;
}
else if(buf.length > len){
// slice source data
bufArray[0] = buf.slice(len);
return;
}
else{
len -= buf.length;
bufArray.shift();
}
}
};
Socket.prototype.sendByteBuffer = function(opcode, bufMsg){
if(this.active == false){
return;
}
var body = new ByteBuffer(bufMsg);
// head
var head = new ByteBuffer(6);
head.writeUInt16(opcode);
head.writeUInt32(body.buffer.length);
this.outData.push(head.buffer);
// body
this.outData.push(body.buffer);
LOG(this._sockID + ': send msg ' + opcode + ' size ' + body.buffer.length);
DEBUG_BYTE(head.buffer);
DEBUG_BYTE(body.buffer);
this.flushOutData();
};
Socket.prototype.flushOutData = function(){
var sockObj = this;
var outBuf = sockObj.outData;
if(outBuf && outBuf.length > 0){
var buf = outBuf[0];
var ret = this.writeData(buf);
// send all ok
if(true == ret){
LOG(sockObj._sockID + ': flush data full ' + buf.length);
outBuf.shift();
process.nextTick(function(){
sockObj.flushOutData();
});
}
// wait
else if(false == ret){
process.nextTick(function(){
sockObj.flushOutData();
});
}
// send partial
else{
LOG(sockObj._sockID + ': flush data partial ' + ret);
outBuf[0] = buf.slice(ret);
process.nextTick(function(){
sockObj.flushOutData();
});
}
}
};
// check if there any data pending
Socket.prototype.writeData = function(data){
var sock = this._sock;
if(sock.bufferSize > 0){
return false;
}
else{
// return ok
if(true == sock.write(data)){
return true;
}
// or queue buffer size
else{
return sock.bufferSize;
}
}
};
var net = require('net');
function Acceptor(port, cb)
{
this.port = port;
this.server = null;
this.newSockCB = cb;
this.server = net.createServer(this.newSockCB);
this.server.listen(this.port);
LOG("acceptor init on port="+this.port);
}
Acceptor.prototype.init = function(){
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment