Created
December 7, 2015 21:49
-
-
Save 4rzael/0753e588a94687e1962b to your computer and use it in GitHub Desktop.
The logging version of MQTT client.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
/** | |
* Module dependencies | |
*/ | |
/*global setImmediate:true*/ | |
var events = require('events'), | |
Store = require('./store'), | |
eos = require('end-of-stream'), | |
mqttPacket = require('mqtt-packet'), | |
Writable = require('readable-stream').Writable, | |
inherits = require('inherits'), | |
reInterval = require('reinterval'), | |
fs = require('fs'), | |
setImmediate = global.setImmediate || function (callback) { | |
// works in node v0.8 | |
process.nextTick(callback); | |
}, | |
defaultConnectOptions = { | |
keepalive: 10, | |
protocolId: 'MQTT', | |
protocolVersion: 4, | |
reconnectPeriod: 1000, | |
connectTimeout: 30 * 1000, | |
clean: true | |
}; | |
fs.exists('./mqttData.csv', function (exists) { | |
if (!exists) | |
fs.writeFile('./mqttData.csv', 'type;date;size;packet\n'); | |
}); | |
function defaultId () { | |
return 'mqttjs_' + Math.random().toString(16).substr(2, 8); | |
} | |
function sendPacket (client, packet, cb) { | |
try { | |
var buf = mqttPacket.generate(packet); | |
fs.writeFile('./mqttData.csv', 'output;' + (new Date().toISOString()) + ';' + buf.length + ';' + JSON.stringify(packet) + '\n', {flag: 'a'}); | |
if (!client.stream.write(buf) && cb) { | |
client.stream.once('drain', cb); | |
} else if (cb) { | |
cb(); | |
} | |
} catch (err) { | |
if (cb) { | |
cb(err); | |
} else { | |
client.emit('error', err); | |
} | |
} | |
} | |
function storeAndSend (client, packet, cb) { | |
client.outgoingStore.put(packet, function storedPacket (err) { | |
if (err) { | |
return cb && cb(err); | |
} | |
sendPacket(client, packet, cb); | |
}); | |
} | |
function nop () {} | |
/** | |
* MqttClient constructor | |
* | |
* @param {Stream} stream - stream | |
* @param {Object} [options] - connection options | |
* (see Connection#connect) | |
*/ | |
function MqttClient (streamBuilder, options) { | |
var k, | |
that = this; | |
if (!(this instanceof MqttClient)) { | |
return new MqttClient(streamBuilder, options); | |
} | |
this.options = options || {}; | |
// Defaults | |
for (k in defaultConnectOptions) { | |
if ('undefined' === typeof this.options[k]) { | |
this.options[k] = defaultConnectOptions[k]; | |
} else { | |
this.options[k] = options[k]; | |
} | |
} | |
this.options.clientId = this.options.clientId || defaultId(); | |
this.streamBuilder = streamBuilder; | |
// Inflight message storages | |
this.outgoingStore = this.options.outgoingStore || new Store(); | |
this.incomingStore = this.options.incomingStore || new Store(); | |
// Ping timer, setup in _setupPingTimer | |
this.pingTimer = null; | |
// Is the client connected? | |
this.connected = false; | |
// Are we disconnecting? | |
this.disconnecting = false; | |
// Packet queue | |
this.queue = []; | |
// Are we intentionally disconnecting? | |
this.disconnecting = false; | |
// connack timer | |
this.connackTimer = null; | |
// Reconnect timer | |
this.reconnectTimer = null; | |
// MessageIDs starting with 1 | |
this.nextId = Math.floor(Math.random() * 65535); | |
// Inflight callbacks | |
this.outgoing = {}; | |
// Mark connected on connect | |
this.on('connect', function () { | |
this.connected = true; | |
var outStore = null; | |
outStore = this.outgoingStore.createStream(); | |
// Control of stored messages | |
outStore.once('readable', function () { | |
function storeDeliver () { | |
var packet = outStore.read(1); | |
if (!packet) { | |
return; | |
} | |
// Avoid unnecesary stream read operations when disconnected | |
if (!that.disconnecting && !that.reconnectTimer && (0 < that.options.reconnectPeriod)) { | |
outStore.read(0); | |
// Ensure that the next message will only be read after callback is issued | |
that.outgoing[packet.messageId] = storeDeliver; | |
that._sendPacket(packet); | |
} else if (outStore.destroy) { | |
outStore.destroy(); | |
} | |
} | |
storeDeliver(); | |
}) | |
.on('error', this.emit.bind(this, 'error')); | |
}); | |
// Mark disconnected on stream close | |
this.on('close', function () { | |
this.connected = false; | |
}); | |
// Setup ping timer | |
this.on('connect', this._setupPingTimer); | |
// Send queued packets | |
this.on('connect', function () { | |
var queue = this.queue; | |
function deliver () { | |
var entry = queue.shift(), | |
packet = null; | |
if (!entry) { | |
return; | |
} | |
packet = entry.packet; | |
that._sendPacket( | |
packet, | |
function (err) { | |
if (entry.cb) { | |
entry.cb(err); | |
} | |
deliver(); | |
} | |
); | |
} | |
deliver(); | |
}); | |
// Clear ping timer | |
this.on('close', function () { | |
if (null !== that.pingTimer) { | |
that.pingTimer.clear(); | |
that.pingTimer = null; | |
} | |
}); | |
// Setup reconnect timer on disconnect | |
this.on('close', this._setupReconnect); | |
events.EventEmitter.call(this); | |
this._setupStream(); | |
} | |
inherits(MqttClient, events.EventEmitter); | |
/** | |
* setup the event handlers in the inner stream. | |
* | |
* @api private | |
*/ | |
MqttClient.prototype._setupStream = function () { | |
var connectPacket, | |
that = this, | |
writable = new Writable(), | |
parser = mqttPacket.parser(this.options), | |
completeParse = null, | |
packets = [], | |
lastBuf = null; | |
this._clearReconnect(); | |
this.stream = this.streamBuilder(this); | |
parser.on('packet', function (packet) { | |
packets.push(packet); | |
}); | |
function process () { | |
var packet = packets.shift(), | |
done = completeParse; | |
if (packet) { | |
fs.writeFile('./mqttData.csv', 'input;' + (new Date().toISOString()) + ';' + lastBuf.length + ';' + JSON.stringify(packet) + '\n', {flag: 'a'}); | |
that._handlePacket(packet, process); | |
lastBuf = null; | |
} else { | |
completeParse = null; | |
done(); | |
} | |
} | |
writable._write = function (buf, enc, done) { | |
completeParse = done; | |
lastBuf = buf; | |
parser.parse(buf); | |
process(); | |
}; | |
this.stream.pipe(writable); | |
// Suppress connection errors | |
this.stream.on('error', nop); | |
// Echo stream close | |
eos(this.stream, this.emit.bind(this, 'close')); | |
// Send a connect packet | |
connectPacket = Object.create(this.options); | |
connectPacket.cmd = 'connect'; | |
// avoid message queue | |
sendPacket(this, connectPacket); | |
// Echo connection errors | |
parser.on('error', this.emit.bind(this, 'error')); | |
// many drain listeners are needed for qos 1 callbacks if the connection is intermittent | |
this.stream.setMaxListeners(1000); | |
clearTimeout(this.connackTimer); | |
this.connackTimer = setTimeout(function () { | |
that._cleanUp(true); | |
}, this.options.connectTimeout); | |
}; | |
MqttClient.prototype._handlePacket = function (packet, done) { | |
switch (packet.cmd) { | |
case 'publish': | |
this._handlePublish(packet, done); | |
break; | |
case 'puback': | |
case 'pubrec': | |
case 'pubcomp': | |
case 'suback': | |
case 'unsuback': | |
this._handleAck(packet); | |
done(); | |
break; | |
case 'pubrel': | |
this._handlePubrel(packet, done); | |
break; | |
case 'connack': | |
this._handleConnack(packet); | |
done(); | |
break; | |
case 'pingresp': | |
this._handlePingresp(packet); | |
done(); | |
break; | |
default: | |
// do nothing | |
// maybe we should do an error handling | |
// or just log it | |
break; | |
} | |
// When a packet is received, reschedule the ping timer | |
this._shiftPingInterval(); | |
}; | |
MqttClient.prototype._checkDisconnecting = function (callback) { | |
if (this.disconnecting) { | |
if (callback) { | |
callback(new Error('client disconnecting')); | |
} else { | |
this.emit(new Error('client disconnecting')); | |
} | |
} | |
return this.disconnecting; | |
}; | |
/** | |
* publish - publish <message> to <topic> | |
* | |
* @param {String} topic - topic to publish to | |
* @param {String, Buffer} message - message to publish | |
* @param {Object} [opts] - publish options, includes: | |
* {Number} qos - qos level to publish on | |
* {Boolean} retain - whether or not to retain the message | |
* @param {Function} [callback] - function(err){} | |
* called when publish succeeds or fails | |
* @returns {MqttClient} this - for chaining | |
* @api public | |
* | |
* @example client.publish('topic', 'message'); | |
* @example | |
* client.publish('topic', 'message', {qos: 1, retain: true}); | |
* @example client.publish('topic', 'message', console.log); | |
*/ | |
MqttClient.prototype.publish = function (topic, message, opts, callback) { | |
var packet; | |
// .publish(topic, payload, cb); | |
if ('function' === typeof opts) { | |
callback = opts; | |
opts = null; | |
} | |
// Default opts | |
if (!opts) { | |
opts = {qos: 0, retain: false}; | |
} | |
if (this._checkDisconnecting(callback)) { | |
return this; | |
} | |
packet = { | |
cmd: 'publish', | |
topic: topic, | |
payload: message, | |
qos: opts.qos, | |
retain: opts.retain, | |
messageId: this._nextId() | |
}; | |
switch (opts.qos) { | |
case 1: | |
case 2: | |
// Add to callbacks | |
this.outgoing[packet.messageId] = callback || nop; | |
this._sendPacket(packet); | |
break; | |
default: | |
this._sendPacket(packet, callback); | |
break; | |
} | |
return this; | |
}; | |
/** | |
* subscribe - subscribe to <topic> | |
* | |
* @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} | |
* @param {Object} [opts] - optional subscription options, includes: | |
* {Number} qos - subscribe qos level | |
* @param {Function} [callback] - function(err, granted){} where: | |
* {Error} err - subscription error (none at the moment!) | |
* {Array} granted - array of {topic: 't', qos: 0} | |
* @returns {MqttClient} this - for chaining | |
* @api public | |
* @example client.subscribe('topic'); | |
* @example client.subscribe('topic', {qos: 1}); | |
* @example client.subscribe({'topic': 0, 'topic2': 1}, console.log); | |
* @example client.subscribe('topic', console.log); | |
*/ | |
MqttClient.prototype.subscribe = function () { | |
var packet, | |
args = Array.prototype.slice.call(arguments), | |
subs = [], | |
obj = args.shift(), | |
callback = args.pop() || nop, | |
opts = args.pop(); | |
if ('string' === typeof obj) { | |
obj = [obj]; | |
} | |
if (this._checkDisconnecting(callback)) { | |
return this; | |
} | |
if ('function' !== typeof callback) { | |
opts = callback; | |
callback = nop; | |
} | |
if (!opts) { | |
opts = { qos: 0 }; | |
} | |
if (Array.isArray(obj)) { | |
obj.forEach(function (topic) { | |
subs.push({ | |
topic: topic, | |
qos: opts.qos | |
}); | |
}); | |
} else { | |
Object | |
.keys(obj) | |
.forEach(function (k) { | |
subs.push({ | |
topic: k, | |
qos: obj[k] | |
}); | |
}); | |
} | |
packet = { | |
cmd: 'subscribe', | |
subscriptions: subs, | |
qos: 1, | |
retain: false, | |
dup: false, | |
messageId: this._nextId() | |
}; | |
this.outgoing[packet.messageId] = callback; | |
this._sendPacket(packet); | |
return this; | |
}; | |
/** | |
* unsubscribe - unsubscribe from topic(s) | |
* | |
* @param {String, Array} topic - topics to unsubscribe from | |
* @param {Function} [callback] - callback fired on unsuback | |
* @returns {MqttClient} this - for chaining | |
* @api public | |
* @example client.unsubscribe('topic'); | |
* @example client.unsubscribe('topic', console.log); | |
*/ | |
MqttClient.prototype.unsubscribe = function (topic, callback) { | |
var packet = { | |
cmd: 'unsubscribe', | |
qos: 1, | |
messageId: this._nextId() | |
}; | |
callback = callback || nop; | |
if (this._checkDisconnecting(callback)) { | |
return this; | |
} | |
if ('string' === typeof topic) { | |
packet.unsubscriptions = [topic]; | |
} else if ('object' === typeof topic && topic.length) { | |
packet.unsubscriptions = topic; | |
} | |
this.outgoing[packet.messageId] = callback; | |
this._sendPacket(packet); | |
return this; | |
}; | |
/** | |
* end - close connection | |
* | |
* @returns {MqttClient} this - for chaining | |
* @param {Boolean} force - do not wait for all in-flight messages to be acked | |
* @param {Function} cb - called when the client has been closed | |
* | |
* @api public | |
*/ | |
MqttClient.prototype.end = function (force, cb) { | |
var that = this; | |
if ('function' === typeof force) { | |
cb = force; | |
force = false; | |
} | |
function closeStores () { | |
that.incomingStore.close(function () { | |
that.outgoingStore.close(cb); | |
}); | |
} | |
function finish () { | |
that._cleanUp(force, closeStores); | |
} | |
if (this.disconnecting) { | |
return true; | |
} | |
this.disconnecting = true; | |
if (!force && 0 < Object.keys(this.outgoing).length) { | |
// wait 10ms, just to be sure we received all of it | |
this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)); | |
} else { | |
finish(); | |
} | |
return this; | |
}; | |
/** | |
* _reconnect - implement reconnection | |
* @api privateish | |
*/ | |
MqttClient.prototype._reconnect = function () { | |
this.emit('reconnect'); | |
this._setupStream(); | |
}; | |
/** | |
* _setupReconnect - setup reconnect timer | |
*/ | |
MqttClient.prototype._setupReconnect = function () { | |
var that = this; | |
if (!that.disconnecting && !that.reconnectTimer && (0 < that.options.reconnectPeriod)) { | |
this.emit('offline'); | |
that.reconnectTimer = setInterval(function () { | |
that._reconnect(); | |
}, that.options.reconnectPeriod); | |
} | |
}; | |
/** | |
* _clearReconnect - clear the reconnect timer | |
*/ | |
MqttClient.prototype._clearReconnect = function () { | |
if (this.reconnectTimer) { | |
clearInterval(this.reconnectTimer); | |
this.reconnectTimer = false; | |
} | |
}; | |
/** | |
* _cleanUp - clean up on connection end | |
* @api private | |
*/ | |
MqttClient.prototype._cleanUp = function (forced, done) { | |
if (done) { | |
this.stream.on('close', done); | |
} | |
if (forced) { | |
this.stream.destroy(); | |
} else { | |
this._sendPacket( | |
{ cmd: 'disconnect' }, | |
setImmediate.bind( | |
null, | |
this.stream.end.bind(this.stream) | |
) | |
); | |
} | |
if (this.reconnectTimer) { | |
this._clearReconnect(); | |
this._setupReconnect(); | |
} | |
if (null !== this.pingTimer) { | |
this.pingTimer.clear(); | |
this.pingTimer = null; | |
} | |
}; | |
/** | |
* _sendPacket - send or queue a packet | |
* @param {String} type - packet type (see `protocol`) | |
* @param {Object} packet - packet options | |
* @param {Function} cb - callback when the packet is sent | |
* @api private | |
*/ | |
MqttClient.prototype._sendPacket = function (packet, cb) { | |
if (!this.connected) { | |
return this.queue.push({ packet: packet, cb: cb }); | |
} | |
switch (packet.qos) { | |
case 2: | |
case 1: | |
storeAndSend(this, packet, cb); | |
break; | |
/** | |
* no need of case here since it will be caught by default | |
* and jshint comply that before default it must be a break | |
* anyway it will result in -1 evaluation | |
*/ | |
case 0: | |
/* falls through */ | |
default: | |
sendPacket(this, packet, cb); | |
break; | |
} | |
}; | |
/** | |
* _setupPingTimer - setup the ping timer | |
* | |
* @api private | |
*/ | |
MqttClient.prototype._setupPingTimer = function () { | |
var that = this; | |
if (!this.pingTimer && this.options.keepalive) { | |
this.pingResp = true; | |
this.pingTimer = reInterval(function () { | |
that._checkPing(); | |
}, this.options.keepalive * 1000); | |
} | |
}; | |
/** | |
* _shiftPingInterval - reschedule the ping interval | |
* | |
* @api private | |
*/ | |
MqttClient.prototype._shiftPingInterval = function () { | |
if (this.pingTimer && this.options.keepalive) { | |
this.pingTimer.reschedule(this.options.keepalive * 1000); | |
} | |
}; | |
/** | |
* _checkPing - check if a pingresp has come back, and ping the server again | |
* | |
* @api private | |
*/ | |
MqttClient.prototype._checkPing = function () { | |
if (this.pingResp) { | |
this.pingResp = false; | |
this._sendPacket({ cmd: 'pingreq' }); | |
} else { | |
// do a forced cleanup since socket will be in bad shape | |
this._cleanUp(true); | |
} | |
}; | |
/** | |
* _handlePingresp - handle a pingresp | |
* | |
* @api private | |
*/ | |
MqttClient.prototype._handlePingresp = function () { | |
this.pingResp = true; | |
}; | |
/** | |
* _handleConnack | |
* | |
* @param {Object} packet | |
* @api private | |
*/ | |
MqttClient.prototype._handleConnack = function (packet) { | |
var rc = packet.returnCode, | |
// TODO: move to protocol | |
errors = [ | |
'', | |
'Unacceptable protocol version', | |
'Identifier rejected', | |
'Server unavailable', | |
'Bad username or password', | |
'Not authorized' | |
]; | |
clearTimeout(this.connackTimer); | |
if (0 === rc) { | |
this.emit('connect', packet); | |
} else if (0 < rc) { | |
this.emit('error', | |
new Error('Connection refused: ' + errors[rc])); | |
} | |
}; | |
/** | |
* _handlePublish | |
* | |
* @param {Object} packet | |
* @api private | |
*/ | |
/* | |
those late 2 case should be rewrite to comply with coding style: | |
case 1: | |
case 0: | |
// do not wait sending a puback | |
// no callback passed | |
if (1 === qos) { | |
this._sendPacket({ | |
cmd: 'puback', | |
messageId: mid | |
}); | |
} | |
// emit the message event for both qos 1 and 0 | |
this.emit('message', topic, message, packet); | |
this.handleMessage(packet, done); | |
break; | |
default: | |
// do nothing but every switch mus have a default | |
// log or throw an error about unknown qos | |
break; | |
for now i just suppressed the warnings | |
*/ | |
MqttClient.prototype._handlePublish = function (packet, done) { | |
var topic = packet.topic.toString(), | |
message = packet.payload, | |
qos = packet.qos, | |
mid = packet.messageId, | |
that = this; | |
switch (qos) { | |
case 2: | |
this.incomingStore.put(packet, function () { | |
that._sendPacket({cmd: 'pubrec', messageId: mid}, done); | |
}); | |
break; | |
case 1: | |
// do not wait sending a puback | |
// no callback passed | |
this._sendPacket({ | |
cmd: 'puback', | |
messageId: mid | |
}); | |
/* falls through */ | |
case 0: | |
// emit the message event for both qos 1 and 0 | |
this.emit('message', topic, message, packet); | |
this.handleMessage(packet, done); | |
break; | |
default: | |
// do nothing | |
// log or throw an error about unknown qos | |
break; | |
} | |
}; | |
/** | |
* Handle messages with backpressure support, one at a time. | |
* Override at will. | |
* | |
* @param Packet packet the packet | |
* @param Function callback call when finished | |
* @api public | |
*/ | |
MqttClient.prototype.handleMessage = function (packet, callback) { | |
callback(); | |
}; | |
/** | |
* _handleAck | |
* | |
* @param {Object} packet | |
* @api private | |
*/ | |
MqttClient.prototype._handleAck = function (packet) { | |
var mid = packet.messageId, | |
type = packet.cmd, | |
response = null, | |
cb = this.outgoing[mid], | |
that = this; | |
if (!cb) { | |
// Server sent an ack in error, ignore it. | |
return; | |
} | |
// Process | |
switch (type) { | |
case 'pubcomp': | |
// same thing as puback for QoS 2 | |
case 'puback': | |
// Callback - we're done | |
delete this.outgoing[mid]; | |
this.outgoingStore.del(packet, cb); | |
break; | |
case 'pubrec': | |
response = { | |
cmd: 'pubrel', | |
qos: 2, | |
messageId: mid | |
}; | |
this._sendPacket(response); | |
break; | |
case 'suback': | |
delete this.outgoing[mid]; | |
this.outgoingStore.del(packet, function (err, original) { | |
var i, | |
origSubs = original.subscriptions, | |
granted = packet.granted; | |
if (err) { | |
// missing packet, what should we do? | |
return that.emit('error', err); | |
} | |
for (i = 0; i < granted.length; i += 1) { | |
origSubs[i].qos = granted[i]; | |
} | |
cb(null, origSubs); | |
}); | |
break; | |
case 'unsuback': | |
delete this.outgoing[mid]; | |
this.outgoingStore.del(packet, cb); | |
break; | |
default: | |
that.emit('error', new Error('unrecognized packet type')); | |
} | |
if (this.disconnecting && | |
0 === Object.keys(this.outgoing).length) { | |
this.emit('outgoingEmpty'); | |
} | |
}; | |
/** | |
* _handlePubrel | |
* | |
* @param {Object} packet | |
* @api private | |
*/ | |
MqttClient.prototype._handlePubrel = function (packet, callback) { | |
var mid = packet.messageId, | |
that = this; | |
that.incomingStore.get(packet, function (err, pub) { | |
if (err) { | |
return that.emit('error', err); | |
} | |
if ('pubrel' !== pub.cmd) { | |
that.emit('message', pub.topic, pub.payload, pub); | |
that.incomingStore.put(packet); | |
} | |
that._sendPacket({cmd: 'pubcomp', messageId: mid}, callback); | |
}); | |
}; | |
/** | |
* _nextId | |
*/ | |
MqttClient.prototype._nextId = function () { | |
var id = this.nextId++; | |
// Ensure 16 bit unsigned int: | |
if (65535 === id) { | |
this.nextId = 1; | |
} | |
return id; | |
}; | |
module.exports = MqttClient; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment