Created
September 23, 2011 09:46
-
-
Save kersten/1237050 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
*This is a simple MQTT cient on node.js | |
*Author: Fan Yilun @CEIT @14 FEB 2011 | |
*/ | |
var sys = require('sys'); | |
var net = require('net'); | |
var EventEmitter = require('events').EventEmitter; | |
var MQTTCONNECT = 0x10; | |
var MQTTPUBLISH = 0x30; | |
var MQTTSUBSCRIBE = 0x80; //8<<4; | |
var KEEPALIVE = 15000; | |
//var client = Client(1883, '127.0.0.1', 'mirror'); | |
function MQTTClient(port, host, clientID) { | |
//EventEmitter.call(this); | |
this.connected = false; | |
this.sessionSend = false; | |
this.sessionOpened = false; | |
this.id = clientID; | |
//this.conn = net.createConnection(port || 1883, host || '127.0.0.1'); | |
this.conn = new net.Socket(); | |
this.conn.connect(port, host); | |
this.conn.setEncoding('binary'); | |
var self = this; | |
//Set timer | |
self.timeout = setTimeout(function() { | |
self.timeUp(); | |
}, 25000); | |
self.conn.addListener('data', function (data) { | |
if(!self.sessionOpened){ | |
//sys.puts("len:"+data.length+' @3:'+data.charCodeAt(3)+'\n'); | |
if(data.length==4 && data.charCodeAt(3)==0){ | |
self.sessionOpened = true; | |
sys.puts("Session opend\n"); | |
self.emit("sessionOpened"); | |
//reset timer | |
clearTimeout(self.timeout); | |
self.timeout = setTimeout(function() { | |
self.timeUp(); | |
}, 3000); | |
}else{ | |
clearTimeout(self.timeout); | |
self.emit("openSessionFailed"); | |
self.conn.end(); | |
//this.conn.destroy(); | |
return; | |
} | |
} else { | |
//sys.puts('len:' + data.length+' Data received:'+data+'\n'); | |
if(data.length > 2){ | |
var buf = new Buffer(data, 'binary'); | |
self.onData(buf); | |
} | |
} | |
}); | |
self.conn.addListener('connect', function () { | |
//sys.puts('connected\n'); | |
self.connected = true; | |
//Once connected, send open stream to broker | |
self.openSession(self.id); | |
}); | |
self.conn.addListener('end', function() { | |
self.connected = false; | |
self.sessionSend = false; | |
self.sessionOpened = false; | |
sys.puts('Connection closed by broker'); | |
}); | |
} | |
sys.inherits(MQTTClient, EventEmitter); | |
exports.MQTTClient = MQTTClient; | |
MQTTClient.prototype.timeUp = function(){ | |
if(this.connected && this.sessionOpened){ | |
//sys.puts('25s keep alive'); | |
this.live(); | |
} else if (!this.connected ){ | |
sys.puts('MQTT connect to server time out'); | |
this.emit("connectTimeOut"); | |
} else { | |
sys.puts('Unknow state'); | |
} | |
}; | |
MQTTClient.prototype.openSession = function (id) { | |
var i = 0; | |
var buffer = new Buffer(16+id.length); | |
buffer[i++] = MQTTCONNECT; | |
buffer[i++] = 14+id.length; | |
buffer[i++] = 0x00; | |
buffer[i++] = 0x06; | |
buffer[i++] = 0x4d; | |
buffer[i++] = 0x51; | |
buffer[i++] = 0x49; | |
buffer[i++] = 0x73; | |
buffer[i++] = 0x64; | |
buffer[i++] = 0x70; | |
buffer[i++] = 0x03; | |
buffer[i++] = 0x02; | |
//Keep alive for 30s | |
buffer[i++] = 0x00; | |
buffer[i++] = KEEPALIVE/500; //Keepalive for 30s | |
buffer[i++] = 0x00; | |
buffer[i++] = id.length; | |
for (var n = 0; n < id.length; n++) { //Insert client id | |
buffer[i++] = id.charCodeAt(n); //Convert string to utf8 | |
} | |
//sys.puts(buffer.toString('utf8',0, 16)+' '+buffer.length); | |
this.conn.write(buffer, "binary"); | |
this.sessionSend = true; | |
sys.puts('Connected as :'+id+'\n'); | |
//publish('node', 'here is nodejs'); | |
//this.subscribe('mirror'); | |
}; | |
/*subscribes to topics */ | |
MQTTClient.prototype.subscribe = function (sub_topic) { | |
if(this.connected){ | |
var i = 0; | |
var buffer = new Buffer(7+sub_topic.length); | |
; | |
//fixed header | |
buffer[i++] = MQTTSUBSCRIBE; | |
buffer[i++] = 5 + sub_topic.length; | |
//varibale header | |
buffer[i++] = 0x00; | |
buffer[i++] = 0x0a; //message id | |
//payload | |
buffer[i++] = 0x00; | |
buffer[i++] = sub_topic.length; | |
for (var n = 0; n < sub_topic.length; n++) { | |
buffer[i++] = sub_topic.charCodeAt(n); | |
} | |
buffer[i++] = 0x00; | |
//sys.puts(7+sub_topic.length); | |
sys.puts('Subcribe to:'+sub_topic); | |
//sys.puts("Subscribe send len:"+buffer.length+'\n'); | |
this.conn.write(buffer, "binary"); | |
//reset timer | |
var cc = this; | |
clearTimeout(this.timeout); | |
this.timeout = setTimeout(function() { | |
cc.timeUp(); | |
}, 25000); | |
} | |
}; | |
/*publishes to topics*/ | |
MQTTClient.prototype.publish = function (pub_topic, payload) { | |
if(this.connected){ | |
var i = 0, n = 0; | |
var var_header = new Buffer(2+pub_topic.length); | |
//Variable header | |
//Assume payload length no longer than 128 | |
var_header[i++] = 0; | |
var_header[i++] = pub_topic.length; | |
for (n = 0; n < pub_topic.length; n++) { | |
var_header[i++] = pub_topic.charCodeAt(n); | |
} | |
//QoS 1&2 | |
//var_header[i++] = 0; | |
//var_header[i++] = 0x03; | |
i = 0; | |
var buffer = new Buffer(2+var_header.length+payload.length); | |
//Fix header | |
buffer[i++] = MQTTPUBLISH; | |
buffer[i++] = payload.length + var_header.length; | |
for (n = 0; n < var_header.length; n++) { | |
buffer[i++] = var_header[n]; | |
} | |
for (n = 0; n < payload.length; n++) { //Insert payloads | |
buffer[i++] = payload.charCodeAt(n); | |
} | |
sys.puts("||Publish|| "+pub_topic+' : '+payload); | |
this.conn.write(buffer, "binary"); | |
//reset timer | |
var cc = this; | |
clearTimeout(this.timeout); | |
this.timeout = setTimeout(function() { | |
cc.timeUp(); | |
}, 25000); | |
} | |
}; | |
MQTTClient.prototype.onData = function(data){ | |
var type = data[0]>>4; | |
//sys.puts('\ntype:'+type); | |
//sys.puts('1:'+data[1]); | |
//sys.puts('2:'+data[2]); | |
//sys.puts('3:'+data[3]); | |
if (type == 3) { // PUBLISH | |
var multiplier = 1; | |
var value = 0; | |
var remaining = 0; | |
var cnt = 1; // Byte 1 | |
do { | |
remaining++; | |
if (remaining > 4) { | |
throw "Protocol Error" | |
} | |
digit = data[cnt]; | |
value += (digit & 127) * multiplier; | |
multiplier *= 128; | |
cnt++; | |
} while ((digit & 128) != 0); | |
cnt++; | |
var tl = data[cnt++]; //<<4 // Byte 1 2 | |
var topic = new Buffer(tl); | |
for(var i = 0; i < tl; i++){ | |
topic[i] = data[i+cnt]; | |
} | |
cnt = cnt + tl; | |
cnt++; | |
cnt++; | |
cnt++; | |
value = value - tl - 2; | |
if (value > 0) { | |
var message = new Buffer(value); | |
for (var i = 0; i < value; i++) { | |
message[i] = data[i + cnt - 3]; | |
} | |
//var payload = data.slice(tl+cnt, value); | |
this.emit("mqttData", topic, message); | |
} | |
cnt = cnt + value - 3; | |
if(typeof(data[cnt]) != 'undefined') { | |
var left = data.length - cnt; | |
var leftBuf = new Buffer(left, 'binary'); | |
for (var i = 0; i < left; i++) { | |
leftBuf[i] = data[i + cnt]; | |
} | |
this.onData(leftBuf); | |
} | |
} else if (type == 12) { // PINGREG -- Ask for alive | |
//Send [208, 0] to server | |
sys.puts('Send 208 0'); | |
var packet208 = new Buffer(2); | |
packet208[0] = 0xd0; | |
packet208[1] = 0x00; | |
this.conn.write(packet208, "binary"); | |
//reset timer | |
var cc = this; | |
clearTimeout(this.timeout); | |
this.timeout = setTimeout(function() { | |
cc.timeUp(); | |
}, 25000); | |
} | |
} | |
MQTTClient.prototype.live = function () { | |
//Send [192, 0] to server | |
var packet192 = new Buffer(2); | |
packet192[0] = 0xc0; | |
packet192[1] = 0x00; | |
this.conn.write(packet192, "binary"); | |
//reset timer | |
var cc = this; | |
clearTimeout(this.timeout); | |
this.timeout = setTimeout(function() { | |
cc.timeUp(); | |
//self.publish('node', 'hello wtf'); | |
}, 25000); //send keepavie every 25s | |
}; | |
MQTTClient.prototype.disconnect = function () { | |
//Send [224,0] to server | |
var packet224 = new Buffer(2); | |
packet224[0] = 0xe0; | |
packet224[1] = 0x00; | |
this.conn.write(packet224, "binary"); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment