Skip to content

Instantly share code, notes, and snippets.

@xizhibei
Created December 12, 2016 07:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xizhibei/4b97f56690dc3bc18cda740544fb63a1 to your computer and use it in GitHub Desktop.
Save xizhibei/4b97f56690dc3bc18cda740544fb63a1 to your computer and use it in GitHub Desktop.
Banyun amqp logstash
'use strict';
const bunyan = require('bunyan');
const Promise = require('bluebird');
const amqp = require('amqplib');
const os = require('os');
const CBuffer = require('CBuffer');
const util = require('util');
const EventEmitter = require('events').EventEmitter;
const levels = {
10: 'trace',
20: 'debug',
30: 'info',
40: 'warn',
50: 'error',
60: 'fatal'
};
function createLogstashStream(options) {
return new LogstashStream(options);
}
function LogstashStream(options) {
EventEmitter.call(this);
options = options || {};
this.name = 'bunyan';
this.url = options.url;
this.level = options.level || 'info';
this.server = options.server || os.hostname();
this.application = options.application || process.title;
this.pid = options.pid || process.pid;
this.tags = options.tags || ['bunyan'];
this.type = options.type;
this.cbufferSize = options.bufferSize || 1000;
this.messageFormatter = options.messageFormatter;
this._exchange = options.exchange;
this._queue = options.queue;
this._logBuffer = new CBuffer(this.cbufferSize);
this._connected = false;
}
util.inherits(LogstashStream, EventEmitter);
LogstashStream.prototype._getConnection = function () {
if (this._connection) return Promise.resolve(this._connection);
return amqp.connect(this.url)
.then((conn) => {
this._connection = conn;
this._connection.on('error', (e) => {
this._channel = null;
this._connected = false;
this.emit('error', e);
this._connection = null;
});
this._connection.on('close', () => {
this._channel = null;
this._connected = false;
this.emit('close');
this._connection = null;
});
});
}
LogstashStream.prototype.connect = function connect() {
if (this._connecting || this._connected) return;
this._connecting = true;
return this._getConnection()
.then(() => {
return this._connection
.createChannel()
.then((ch) => {
this._channel = ch;
return Promise.all([
ch.assertExchange(this._exchange, 'topic', {durable: true}),
ch.assertQueue(this._queue, {durable: true, exclusive: false}),
ch.bindQueue(this._queue, this._exchange, '*.*.*'),
]);
})
.then(() => {
this._connected = true;
this._connecting = false;
this.emit('connect');
this.flush();
})
});
}
LogstashStream.prototype.write = function logstashWrite(entry) {
let level, rec, msg;
if (typeof(entry) === 'string') {
rec = JSON.parse(entry);
} else {
rec = Object.assign({}, entry);
}
level = rec.level;
if (levels.hasOwnProperty(level)) {
level = levels[level];
}
msg = {
level,
'@timestamp': rec.time.toISOString(),
message: rec.msg,
tags: this.tags,
source: this.server + '/' + this.application,
};
if (typeof(this.type) === 'string' && this.type) {
msg.type = this.type;
}
delete rec.time;
delete rec.msg;
delete rec.v;
delete rec.level;
rec.pid = this.pid;
msg = Object.assign({}, msg, rec);
if (this.messageFormatter) {
msg = this.messageFormatter(msg);
if ((msg === undefined) || (msg === null)) {
return;
}
}
const rkA = this.server.replace('.', '-');
const rkB = this.application.replace('.', '-');
const rkC = level.replace('.', '-');
const routingKey = `${rkA}.${rkB}.${rkC}`;
this.send(routingKey, JSON.stringify(msg, bunyan.safeCycles()));
};
LogstashStream.prototype.flush = function () {
let buf = this._logBuffer.pop();
while (buf) {
this.send(buf.routingKey, buf.message);
buf = this._logBuffer.pop();
}
this._logBuffer.empty();
};
LogstashStream.prototype.send = function logstashSend(routingKey, message) {
if (this._channel) {
return this._channel.publish(this._exchange, routingKey, new Buffer(message));
}
this.connect();
this._logBuffer.push({routingKey: routingKey, message: message});
};
module.exports = {
createStream: createLogstashStream,
LogstashStream: LogstashStream
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment