Skip to content

Instantly share code, notes, and snippets.

@iamjochem
Created September 16, 2017 15:47
Show Gist options
  • Save iamjochem/c6d74c4ed916acab56973d7b8f0a0697 to your computer and use it in GitHub Desktop.
Save iamjochem/c6d74c4ed916acab56973d7b8f0a0697 to your computer and use it in GitHub Desktop.
[NodeJS:bunyan,rabbitmq] a custom bunyan stream for publishing logs messages to amq/rabbitmq
// Bunyan AMQP Stream Transport for ES6
'use strict';
// 3rd party deps
const AMQP = require('amqplib/callback_api');
const stringify = require('json-stringify-safe');
const mydebug = require('debug')('bunyan:amq');
// our deps
const genTopic = require('./gentopic'); // used to generate a RabbitMQ topic/routing-key
const loglevels = require('./log_levels'); // used to translates Bunyan integer to levels to strings to be used as AMQP topics
// locals
const trueP = Promise.resolve(true);
const falseP = Promise.resolve(false);
const MAX_Q_LEN = 50000; // max length of unpublished messages queue ()
// expose the module!
module.exports = (options) => new BunyanAMQPTransport(options);
class BunyanAMQPTransport
{
constructor(options)
{
this.total_received = 0;
this.total_published = 0;
this.unpublished = [];
this.exchange = options.exchange ? options.exchange : 'bunyan_logs';
/**
* getch() : get channel
*
* @return {Promise} - promise for a channel object
*/
this.getch = ((dns) => {
let conn_to, conref, chnref, prom, reconnect_cnt = 0, shutting_down = false;
const myself = this;
function close(shutdown_fn)
{
if (myself.running || myself.flooded || myself.unpublished.length) {
mydebug('delaying closing connection, messages still in-flight');
return setTimeout(() => close(shutdown_fn), 250);
}
const done = typeof shutdown_fn === 'function' ? shutdown_fn : () => {};
const doDone = () => { mydebug(`received ${myself.total_received} & published ${myself.total_published} log messages during process life-time`); done(); };
const doCloseCon = () => (conref ? conref.close(doDone) : doDone());
const doClose = () => (chnref ? chnref.close(doCloseCon) : doCloseCon());
if (myself.last_flush) {
const offset = Math.max(myself.last_flush_count, 20) * 5,
now = +new Date
;
// mydebug(`last_flush = ${myself.last_flush}, now = ${now}, offset = ${offset}, myself.last_flush < now - offset = ${myself.last_flush < now - offset}`);
if (myself.last_flush > now - offset) {
mydebug(myself.last_flush_count ? `last flushed ${myself.last_flush_count} log messages ${now - myself.last_flush} seconds ago ... waiting ${offset} ms before closing AMQ connection.`
: `last flushed log messages ${now - myself.last_flush} seconds ago ... waiting ${offset} ms before closing AMQ connection`);
return setTimeout(() => doClose(), offset);
}
}
doClose();
}
function reset(err)
{
// TODO: better way to handle potential error in this context?
// (bare in mind this is an error inside of a logger stream transport!?)
if (err && !options.ignore_errors)
console.error(err); // eslint-disable-line no-console
close();
if (shutting_down)
return; // no reseting of connection ref is shutting down - we want to possible hang 'destroy' event handlers on it
conref = null;
chnref = null;
prom = null;
reconnect_cnt += 1;
}
function reset_throw(err)
{
reset(err); throw err;
}
return (shutdown) => {
if (shutdown && !shutting_down) {
shutting_down = true;
close(shutdown);
}
if (shutting_down)
return Promise.resolve( chnref );
if (!prom) {
prom = new Promise((resolve, reject) => {
if (conn_to)
clearTimeout(conn_to);
conn_to = setTimeout(() => {
AMQP.connect(dns, {}, (e, o) => { if (e) reject(e); else resolve(o); });
}, Math.min(5000, reconnect_cnt * 1000));
});
prom = prom .then ( con => (conref = con).createChannel() )
.then ( chn => (chnref = chn).assertExchange(this.exchange, 'topic', { durable: true }) )
.then ( () => reconnect_cnt = 0 )
.then ( () => {
const block = () => this.flooded = true;
const drain = () => this.flush(chnref, true);
conref.on('error', reset);
chnref.on('error', reset);
chnref.on('drain', drain);
// RabbitMQ specific extension (see here: http://www.rabbitmq.com/connection-blocked.html)
chnref.on('blocked', block);
chnref.on('unblocked', drain);
})
.then ( () => chnref )
.catch( reset_throw );
}
return prom;
};
})(genDSN(options));
}
flush(channel, drained)
{
if (!channel)
return falseP;
if (this.running)
return falseP;
if (drained === true) {
this.last_flush_count = null;
this.flooded = false;
}
if (this.flooded || !this.unpublished.length)
return falseP;
if (this.unpublished.length > MAX_Q_LEN) {
mydebug(`max queue length exceeded, dropping ${this.unpublished.length - MAX_Q_LEN} message(s).`);
this.unpublished = this.unpublished.slice(0, MAX_Q_LEN);
}
this.running = true;
{
const c = this.unpublished.length,
m = this.unpublished[c - 1],
r = channel.publish(this.exchange, m[0], m[1], { contentType : 'application/json' })
;
this.running = false;
this.last_flush = +new Date;
if (r === false) {
this.flooded = true;
return falseP;
}
this.total_published += 1;
this.unpublished.pop();
if (this.unpublished.length)
return this.flush(channel).then(ok => {
this.last_flush_count = c;
return ok;
});
return trueP;
}
}
write(message)
{
this.total_received += 1;
const topic = genTopic(loglevels.nearest_lvlstr(message.level), message.name, message.hostname);
// TODO: a better ring buffer?
this.unpublished.unshift([ topic, new Buffer(stringify(message, null, null)) ]);
this.getch().then ( (c) => this.flush(c) ) // eslint-disable-next-line no-console
.catch( (e) => console.log('Failed to publish application log message to "%s" on AMQ exchange "%s"', topic, this.exchange, e) )
;
}
close(done)
{
// at this stage we must swallow the error (alternative is to loop forever - which is not very helpful either)
const finish = (err) => {
if (this.unpublished.length)
this.unpublished.forEach(m => {
const s = m[1];
// ignore unpublished info messages
if (s.level === 30) // eslint-disable-next-line no-console
console.error(s);
});
if (err) // eslint-disable-next-line no-console
console.error(err);
this.getch(done).catch(done);
};
return this.getch().then((c) => {
if (!this.unpublished.length)
return finish();
return this.flush(c).then(function(ok) {
if (ok)
throw new Error('failed to [completely] flush AMQ logging-connection, waiting a little while to try one more time.');
return finish();
}).catch(() => {
setTimeout(() => {
if (!c)
return finish();
this.flush(c).then(finish).catch(finish);
}, 1200).unref();
});
});
}
}
function genDSN(opts)
{
let dsn;
const options = opts || {};
if (options.dsn) {
dsn = options.dsn;
} else {
const usr = options.username || 'guest';
const pwd = options.password || 'guest';
const host = options.host ? options.host : '127.0.0.1';
const port = options.port ? options.port : 5672;
const vhost = options.vhost ? '/' + options.vhost : '';
dsn = `amqp://${usr}:${pwd}@${host}:${port}${vhost}`;
}
return dsn;
}
module.exports = {
// separator char for amq topic/routing-key segments
TOPIC_SEGMENT_SEPARATOR : '.',
};
// 3rd party deps
const isarr = require('lodash/isArray');
const isbool = require('lodash/isBoolean');
const isnum = require('lodash/isNumber');
// our deps
const constants = require('./constants');
// locals
const host_name = require('os').hostname();
/**
* predefined filtering function for topic-part arrays,
* filters empty values
*
* @type {Function}
*/
const filter_tparts = require('lodash/partialRight')(require('lodash/filter'), require('lodash/trim'));
/**
* cleans an individual topic segment, to ensure that
* the segment separator char is replaced in each topic segment value
* and the value is always lower case
*
* @type {Function}
*/
const tpart_clean = (s) => s.toLowerCase().replace(constants.TOPIC_SEGMENT_SEPARATOR, '');
/**
* helper for build a topic string from an array of topic segments
*
* @param {Array} parts
* @return {String}
* @throws {Error} - throw is `parts` is not an array or contains less than 2 valid values
*/
function build_topic_str(parts)
{
let filtered;
if (!isarr(parts) || ((filtered = filter_tparts(parts)).length < 2))
throw new Error(`Invalid topic parts given (${filtered.join(',')}), cannot generate a messaging topic string`);
return filtered.map( tpart_clean ).join(constants.TOPIC_SEGMENT_SEPARATOR);
}
module.exports = function(lvl, name, host, ...args) {
return build_topic_str([lvl || 'error', name, host || host_name, ...args]);
};
// 3rd party deps
const invert = require('lodash/invert');
const isstr = require('lodash/isString');
const isfunc = require('lodash/isFunction');
const isnum = require('lodash/isNumber');
const clone = require('lodash/clone');
const find = require('lodash/find');
const keys = require('lodash/keys');
const equals = require('lodash/eq');
const negate = require('lodash/negate');
const mapvals = require('lodash/mapValues');
const comparators = [
{ fn : equals, operators : ['eq', '=', '==', '==='] },
{ fn : negate(equals), operators : ['neq', '!=', '!==', 'not'] },
{ fn : require('lodash/lt'), operators : ['lt', '<'] },
{ fn : require('lodash/lte'), operators : ['lte', '<='] },
{ fn : require('lodash/gt'), operators : ['gt', '>'] },
{ fn : require('lodash/gte'), operators : ['gte', '>='] },
];
// locals
const default_lvl = 'info';
const level_map = {
10 : 'trace',
20 : 'debug',
30 : 'info',
40 : 'warn',
50 : 'error',
60 : 'fatal',
};
const level_rmap = mapvals(invert(level_map), v => parseInt(v, 10));
// export module!
module.exports = {
levels : reversed => clone(reversed ? level_rmap : level_map),
default_level : () => default_lvl,
nearest_lvlstr : v => itos(nearest_lvl(v)),
itos,
stoi,
compare,
nearest_lvl,
};
function itos(v)
{
return (isnum(v) && v >= 0 && level_map[ v + '' ]) || null;
}
function stoi(v)
{
return isstr(v) && level_rmap.hasOwnProperty(v.toLowerCase()) ? level_rmap[ v.toLowerCase() ] : null;
}
function compare(a, b, comparator)
{
const a_val = nearest_lvl(a),
b_val = nearest_lvl(b),
fn = determine_comparator_fn(comparator)
;
return isfunc(fn) ? fn.call(null, a_val, b_val) : false;
}
function determine_comparator_fn(comp)
{
if (isfunc(comp))
return comp;
if (isstr(comp)) {
const c = comp.trim().toLowerCase();
let i = comparators.length;
while (i-- > -1) {
const item = comparators[ i ];
if (item.operators.indexOf(c) !== -1)
return item.fn;
}
}
return equals;
}
function nearest_lvl(input)
{
const v = isnum(input) ? input : stoi(input || default_lvl);
if (level_map[ v ])
return v;
const lvl_vals = keys(level_map);
return parseInt(find(lvl_vals, (lvl, i, c) => (v <= lvl) || (i === c.length - 1)) || stoi(default_lvl), 10);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment