Skip to content

Instantly share code, notes, and snippets.

@4rzael
Created December 7, 2015 21:49
Show Gist options
  • Save 4rzael/0753e588a94687e1962b to your computer and use it in GitHub Desktop.
Save 4rzael/0753e588a94687e1962b to your computer and use it in GitHub Desktop.
The logging version of MQTT client.js
'use strict';
/**
* Module dependencies
*/
/*global setImmediate:true*/
var events = require('events'),
Store = require('./store'),
eos = require('end-of-stream'),
mqttPacket = require('mqtt-packet'),
Writable = require('readable-stream').Writable,
inherits = require('inherits'),
reInterval = require('reinterval'),
fs = require('fs'),
setImmediate = global.setImmediate || function (callback) {
// works in node v0.8
process.nextTick(callback);
},
defaultConnectOptions = {
keepalive: 10,
protocolId: 'MQTT',
protocolVersion: 4,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
clean: true
};
fs.exists('./mqttData.csv', function (exists) {
if (!exists)
fs.writeFile('./mqttData.csv', 'type;date;size;packet\n');
});
function defaultId () {
return 'mqttjs_' + Math.random().toString(16).substr(2, 8);
}
function sendPacket (client, packet, cb) {
try {
var buf = mqttPacket.generate(packet);
fs.writeFile('./mqttData.csv', 'output;' + (new Date().toISOString()) + ';' + buf.length + ';' + JSON.stringify(packet) + '\n', {flag: 'a'});
if (!client.stream.write(buf) && cb) {
client.stream.once('drain', cb);
} else if (cb) {
cb();
}
} catch (err) {
if (cb) {
cb(err);
} else {
client.emit('error', err);
}
}
}
function storeAndSend (client, packet, cb) {
client.outgoingStore.put(packet, function storedPacket (err) {
if (err) {
return cb && cb(err);
}
sendPacket(client, packet, cb);
});
}
function nop () {}
/**
* MqttClient constructor
*
* @param {Stream} stream - stream
* @param {Object} [options] - connection options
* (see Connection#connect)
*/
function MqttClient (streamBuilder, options) {
var k,
that = this;
if (!(this instanceof MqttClient)) {
return new MqttClient(streamBuilder, options);
}
this.options = options || {};
// Defaults
for (k in defaultConnectOptions) {
if ('undefined' === typeof this.options[k]) {
this.options[k] = defaultConnectOptions[k];
} else {
this.options[k] = options[k];
}
}
this.options.clientId = this.options.clientId || defaultId();
this.streamBuilder = streamBuilder;
// Inflight message storages
this.outgoingStore = this.options.outgoingStore || new Store();
this.incomingStore = this.options.incomingStore || new Store();
// Ping timer, setup in _setupPingTimer
this.pingTimer = null;
// Is the client connected?
this.connected = false;
// Are we disconnecting?
this.disconnecting = false;
// Packet queue
this.queue = [];
// Are we intentionally disconnecting?
this.disconnecting = false;
// connack timer
this.connackTimer = null;
// Reconnect timer
this.reconnectTimer = null;
// MessageIDs starting with 1
this.nextId = Math.floor(Math.random() * 65535);
// Inflight callbacks
this.outgoing = {};
// Mark connected on connect
this.on('connect', function () {
this.connected = true;
var outStore = null;
outStore = this.outgoingStore.createStream();
// Control of stored messages
outStore.once('readable', function () {
function storeDeliver () {
var packet = outStore.read(1);
if (!packet) {
return;
}
// Avoid unnecesary stream read operations when disconnected
if (!that.disconnecting && !that.reconnectTimer && (0 < that.options.reconnectPeriod)) {
outStore.read(0);
// Ensure that the next message will only be read after callback is issued
that.outgoing[packet.messageId] = storeDeliver;
that._sendPacket(packet);
} else if (outStore.destroy) {
outStore.destroy();
}
}
storeDeliver();
})
.on('error', this.emit.bind(this, 'error'));
});
// Mark disconnected on stream close
this.on('close', function () {
this.connected = false;
});
// Setup ping timer
this.on('connect', this._setupPingTimer);
// Send queued packets
this.on('connect', function () {
var queue = this.queue;
function deliver () {
var entry = queue.shift(),
packet = null;
if (!entry) {
return;
}
packet = entry.packet;
that._sendPacket(
packet,
function (err) {
if (entry.cb) {
entry.cb(err);
}
deliver();
}
);
}
deliver();
});
// Clear ping timer
this.on('close', function () {
if (null !== that.pingTimer) {
that.pingTimer.clear();
that.pingTimer = null;
}
});
// Setup reconnect timer on disconnect
this.on('close', this._setupReconnect);
events.EventEmitter.call(this);
this._setupStream();
}
inherits(MqttClient, events.EventEmitter);
/**
* setup the event handlers in the inner stream.
*
* @api private
*/
MqttClient.prototype._setupStream = function () {
var connectPacket,
that = this,
writable = new Writable(),
parser = mqttPacket.parser(this.options),
completeParse = null,
packets = [],
lastBuf = null;
this._clearReconnect();
this.stream = this.streamBuilder(this);
parser.on('packet', function (packet) {
packets.push(packet);
});
function process () {
var packet = packets.shift(),
done = completeParse;
if (packet) {
fs.writeFile('./mqttData.csv', 'input;' + (new Date().toISOString()) + ';' + lastBuf.length + ';' + JSON.stringify(packet) + '\n', {flag: 'a'});
that._handlePacket(packet, process);
lastBuf = null;
} else {
completeParse = null;
done();
}
}
writable._write = function (buf, enc, done) {
completeParse = done;
lastBuf = buf;
parser.parse(buf);
process();
};
this.stream.pipe(writable);
// Suppress connection errors
this.stream.on('error', nop);
// Echo stream close
eos(this.stream, this.emit.bind(this, 'close'));
// Send a connect packet
connectPacket = Object.create(this.options);
connectPacket.cmd = 'connect';
// avoid message queue
sendPacket(this, connectPacket);
// Echo connection errors
parser.on('error', this.emit.bind(this, 'error'));
// many drain listeners are needed for qos 1 callbacks if the connection is intermittent
this.stream.setMaxListeners(1000);
clearTimeout(this.connackTimer);
this.connackTimer = setTimeout(function () {
that._cleanUp(true);
}, this.options.connectTimeout);
};
MqttClient.prototype._handlePacket = function (packet, done) {
switch (packet.cmd) {
case 'publish':
this._handlePublish(packet, done);
break;
case 'puback':
case 'pubrec':
case 'pubcomp':
case 'suback':
case 'unsuback':
this._handleAck(packet);
done();
break;
case 'pubrel':
this._handlePubrel(packet, done);
break;
case 'connack':
this._handleConnack(packet);
done();
break;
case 'pingresp':
this._handlePingresp(packet);
done();
break;
default:
// do nothing
// maybe we should do an error handling
// or just log it
break;
}
// When a packet is received, reschedule the ping timer
this._shiftPingInterval();
};
MqttClient.prototype._checkDisconnecting = function (callback) {
if (this.disconnecting) {
if (callback) {
callback(new Error('client disconnecting'));
} else {
this.emit(new Error('client disconnecting'));
}
}
return this.disconnecting;
};
/**
* publish - publish <message> to <topic>
*
* @param {String} topic - topic to publish to
* @param {String, Buffer} message - message to publish
* @param {Object} [opts] - publish options, includes:
* {Number} qos - qos level to publish on
* {Boolean} retain - whether or not to retain the message
* @param {Function} [callback] - function(err){}
* called when publish succeeds or fails
* @returns {MqttClient} this - for chaining
* @api public
*
* @example client.publish('topic', 'message');
* @example
* client.publish('topic', 'message', {qos: 1, retain: true});
* @example client.publish('topic', 'message', console.log);
*/
MqttClient.prototype.publish = function (topic, message, opts, callback) {
var packet;
// .publish(topic, payload, cb);
if ('function' === typeof opts) {
callback = opts;
opts = null;
}
// Default opts
if (!opts) {
opts = {qos: 0, retain: false};
}
if (this._checkDisconnecting(callback)) {
return this;
}
packet = {
cmd: 'publish',
topic: topic,
payload: message,
qos: opts.qos,
retain: opts.retain,
messageId: this._nextId()
};
switch (opts.qos) {
case 1:
case 2:
// Add to callbacks
this.outgoing[packet.messageId] = callback || nop;
this._sendPacket(packet);
break;
default:
this._sendPacket(packet, callback);
break;
}
return this;
};
/**
* subscribe - subscribe to <topic>
*
* @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
* @param {Object} [opts] - optional subscription options, includes:
* {Number} qos - subscribe qos level
* @param {Function} [callback] - function(err, granted){} where:
* {Error} err - subscription error (none at the moment!)
* {Array} granted - array of {topic: 't', qos: 0}
* @returns {MqttClient} this - for chaining
* @api public
* @example client.subscribe('topic');
* @example client.subscribe('topic', {qos: 1});
* @example client.subscribe({'topic': 0, 'topic2': 1}, console.log);
* @example client.subscribe('topic', console.log);
*/
MqttClient.prototype.subscribe = function () {
var packet,
args = Array.prototype.slice.call(arguments),
subs = [],
obj = args.shift(),
callback = args.pop() || nop,
opts = args.pop();
if ('string' === typeof obj) {
obj = [obj];
}
if (this._checkDisconnecting(callback)) {
return this;
}
if ('function' !== typeof callback) {
opts = callback;
callback = nop;
}
if (!opts) {
opts = { qos: 0 };
}
if (Array.isArray(obj)) {
obj.forEach(function (topic) {
subs.push({
topic: topic,
qos: opts.qos
});
});
} else {
Object
.keys(obj)
.forEach(function (k) {
subs.push({
topic: k,
qos: obj[k]
});
});
}
packet = {
cmd: 'subscribe',
subscriptions: subs,
qos: 1,
retain: false,
dup: false,
messageId: this._nextId()
};
this.outgoing[packet.messageId] = callback;
this._sendPacket(packet);
return this;
};
/**
* unsubscribe - unsubscribe from topic(s)
*
* @param {String, Array} topic - topics to unsubscribe from
* @param {Function} [callback] - callback fired on unsuback
* @returns {MqttClient} this - for chaining
* @api public
* @example client.unsubscribe('topic');
* @example client.unsubscribe('topic', console.log);
*/
MqttClient.prototype.unsubscribe = function (topic, callback) {
var packet = {
cmd: 'unsubscribe',
qos: 1,
messageId: this._nextId()
};
callback = callback || nop;
if (this._checkDisconnecting(callback)) {
return this;
}
if ('string' === typeof topic) {
packet.unsubscriptions = [topic];
} else if ('object' === typeof topic && topic.length) {
packet.unsubscriptions = topic;
}
this.outgoing[packet.messageId] = callback;
this._sendPacket(packet);
return this;
};
/**
* end - close connection
*
* @returns {MqttClient} this - for chaining
* @param {Boolean} force - do not wait for all in-flight messages to be acked
* @param {Function} cb - called when the client has been closed
*
* @api public
*/
MqttClient.prototype.end = function (force, cb) {
var that = this;
if ('function' === typeof force) {
cb = force;
force = false;
}
function closeStores () {
that.incomingStore.close(function () {
that.outgoingStore.close(cb);
});
}
function finish () {
that._cleanUp(force, closeStores);
}
if (this.disconnecting) {
return true;
}
this.disconnecting = true;
if (!force && 0 < Object.keys(this.outgoing).length) {
// wait 10ms, just to be sure we received all of it
this.once('outgoingEmpty', setTimeout.bind(null, finish, 10));
} else {
finish();
}
return this;
};
/**
* _reconnect - implement reconnection
* @api privateish
*/
MqttClient.prototype._reconnect = function () {
this.emit('reconnect');
this._setupStream();
};
/**
* _setupReconnect - setup reconnect timer
*/
MqttClient.prototype._setupReconnect = function () {
var that = this;
if (!that.disconnecting && !that.reconnectTimer && (0 < that.options.reconnectPeriod)) {
this.emit('offline');
that.reconnectTimer = setInterval(function () {
that._reconnect();
}, that.options.reconnectPeriod);
}
};
/**
* _clearReconnect - clear the reconnect timer
*/
MqttClient.prototype._clearReconnect = function () {
if (this.reconnectTimer) {
clearInterval(this.reconnectTimer);
this.reconnectTimer = false;
}
};
/**
* _cleanUp - clean up on connection end
* @api private
*/
MqttClient.prototype._cleanUp = function (forced, done) {
if (done) {
this.stream.on('close', done);
}
if (forced) {
this.stream.destroy();
} else {
this._sendPacket(
{ cmd: 'disconnect' },
setImmediate.bind(
null,
this.stream.end.bind(this.stream)
)
);
}
if (this.reconnectTimer) {
this._clearReconnect();
this._setupReconnect();
}
if (null !== this.pingTimer) {
this.pingTimer.clear();
this.pingTimer = null;
}
};
/**
* _sendPacket - send or queue a packet
* @param {String} type - packet type (see `protocol`)
* @param {Object} packet - packet options
* @param {Function} cb - callback when the packet is sent
* @api private
*/
MqttClient.prototype._sendPacket = function (packet, cb) {
if (!this.connected) {
return this.queue.push({ packet: packet, cb: cb });
}
switch (packet.qos) {
case 2:
case 1:
storeAndSend(this, packet, cb);
break;
/**
* no need of case here since it will be caught by default
* and jshint comply that before default it must be a break
* anyway it will result in -1 evaluation
*/
case 0:
/* falls through */
default:
sendPacket(this, packet, cb);
break;
}
};
/**
* _setupPingTimer - setup the ping timer
*
* @api private
*/
MqttClient.prototype._setupPingTimer = function () {
var that = this;
if (!this.pingTimer && this.options.keepalive) {
this.pingResp = true;
this.pingTimer = reInterval(function () {
that._checkPing();
}, this.options.keepalive * 1000);
}
};
/**
* _shiftPingInterval - reschedule the ping interval
*
* @api private
*/
MqttClient.prototype._shiftPingInterval = function () {
if (this.pingTimer && this.options.keepalive) {
this.pingTimer.reschedule(this.options.keepalive * 1000);
}
};
/**
* _checkPing - check if a pingresp has come back, and ping the server again
*
* @api private
*/
MqttClient.prototype._checkPing = function () {
if (this.pingResp) {
this.pingResp = false;
this._sendPacket({ cmd: 'pingreq' });
} else {
// do a forced cleanup since socket will be in bad shape
this._cleanUp(true);
}
};
/**
* _handlePingresp - handle a pingresp
*
* @api private
*/
MqttClient.prototype._handlePingresp = function () {
this.pingResp = true;
};
/**
* _handleConnack
*
* @param {Object} packet
* @api private
*/
MqttClient.prototype._handleConnack = function (packet) {
var rc = packet.returnCode,
// TODO: move to protocol
errors = [
'',
'Unacceptable protocol version',
'Identifier rejected',
'Server unavailable',
'Bad username or password',
'Not authorized'
];
clearTimeout(this.connackTimer);
if (0 === rc) {
this.emit('connect', packet);
} else if (0 < rc) {
this.emit('error',
new Error('Connection refused: ' + errors[rc]));
}
};
/**
* _handlePublish
*
* @param {Object} packet
* @api private
*/
/*
those late 2 case should be rewrite to comply with coding style:
case 1:
case 0:
// do not wait sending a puback
// no callback passed
if (1 === qos) {
this._sendPacket({
cmd: 'puback',
messageId: mid
});
}
// emit the message event for both qos 1 and 0
this.emit('message', topic, message, packet);
this.handleMessage(packet, done);
break;
default:
// do nothing but every switch mus have a default
// log or throw an error about unknown qos
break;
for now i just suppressed the warnings
*/
MqttClient.prototype._handlePublish = function (packet, done) {
var topic = packet.topic.toString(),
message = packet.payload,
qos = packet.qos,
mid = packet.messageId,
that = this;
switch (qos) {
case 2:
this.incomingStore.put(packet, function () {
that._sendPacket({cmd: 'pubrec', messageId: mid}, done);
});
break;
case 1:
// do not wait sending a puback
// no callback passed
this._sendPacket({
cmd: 'puback',
messageId: mid
});
/* falls through */
case 0:
// emit the message event for both qos 1 and 0
this.emit('message', topic, message, packet);
this.handleMessage(packet, done);
break;
default:
// do nothing
// log or throw an error about unknown qos
break;
}
};
/**
* Handle messages with backpressure support, one at a time.
* Override at will.
*
* @param Packet packet the packet
* @param Function callback call when finished
* @api public
*/
MqttClient.prototype.handleMessage = function (packet, callback) {
callback();
};
/**
* _handleAck
*
* @param {Object} packet
* @api private
*/
MqttClient.prototype._handleAck = function (packet) {
var mid = packet.messageId,
type = packet.cmd,
response = null,
cb = this.outgoing[mid],
that = this;
if (!cb) {
// Server sent an ack in error, ignore it.
return;
}
// Process
switch (type) {
case 'pubcomp':
// same thing as puback for QoS 2
case 'puback':
// Callback - we're done
delete this.outgoing[mid];
this.outgoingStore.del(packet, cb);
break;
case 'pubrec':
response = {
cmd: 'pubrel',
qos: 2,
messageId: mid
};
this._sendPacket(response);
break;
case 'suback':
delete this.outgoing[mid];
this.outgoingStore.del(packet, function (err, original) {
var i,
origSubs = original.subscriptions,
granted = packet.granted;
if (err) {
// missing packet, what should we do?
return that.emit('error', err);
}
for (i = 0; i < granted.length; i += 1) {
origSubs[i].qos = granted[i];
}
cb(null, origSubs);
});
break;
case 'unsuback':
delete this.outgoing[mid];
this.outgoingStore.del(packet, cb);
break;
default:
that.emit('error', new Error('unrecognized packet type'));
}
if (this.disconnecting &&
0 === Object.keys(this.outgoing).length) {
this.emit('outgoingEmpty');
}
};
/**
* _handlePubrel
*
* @param {Object} packet
* @api private
*/
MqttClient.prototype._handlePubrel = function (packet, callback) {
var mid = packet.messageId,
that = this;
that.incomingStore.get(packet, function (err, pub) {
if (err) {
return that.emit('error', err);
}
if ('pubrel' !== pub.cmd) {
that.emit('message', pub.topic, pub.payload, pub);
that.incomingStore.put(packet);
}
that._sendPacket({cmd: 'pubcomp', messageId: mid}, callback);
});
};
/**
* _nextId
*/
MqttClient.prototype._nextId = function () {
var id = this.nextId++;
// Ensure 16 bit unsigned int:
if (65535 === id) {
this.nextId = 1;
}
return id;
};
module.exports = MqttClient;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment