Created
December 12, 2016 07:05
-
-
Save xizhibei/4b97f56690dc3bc18cda740544fb63a1 to your computer and use it in GitHub Desktop.
Banyun amqp logstash
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const bunyan = require('bunyan'); | |
const Promise = require('bluebird'); | |
const amqp = require('amqplib'); | |
const os = require('os'); | |
const CBuffer = require('CBuffer'); | |
const util = require('util'); | |
const EventEmitter = require('events').EventEmitter; | |
const levels = { | |
10: 'trace', | |
20: 'debug', | |
30: 'info', | |
40: 'warn', | |
50: 'error', | |
60: 'fatal' | |
}; | |
function createLogstashStream(options) { | |
return new LogstashStream(options); | |
} | |
function LogstashStream(options) { | |
EventEmitter.call(this); | |
options = options || {}; | |
this.name = 'bunyan'; | |
this.url = options.url; | |
this.level = options.level || 'info'; | |
this.server = options.server || os.hostname(); | |
this.application = options.application || process.title; | |
this.pid = options.pid || process.pid; | |
this.tags = options.tags || ['bunyan']; | |
this.type = options.type; | |
this.cbufferSize = options.bufferSize || 1000; | |
this.messageFormatter = options.messageFormatter; | |
this._exchange = options.exchange; | |
this._queue = options.queue; | |
this._logBuffer = new CBuffer(this.cbufferSize); | |
this._connected = false; | |
} | |
util.inherits(LogstashStream, EventEmitter); | |
LogstashStream.prototype._getConnection = function () { | |
if (this._connection) return Promise.resolve(this._connection); | |
return amqp.connect(this.url) | |
.then((conn) => { | |
this._connection = conn; | |
this._connection.on('error', (e) => { | |
this._channel = null; | |
this._connected = false; | |
this.emit('error', e); | |
this._connection = null; | |
}); | |
this._connection.on('close', () => { | |
this._channel = null; | |
this._connected = false; | |
this.emit('close'); | |
this._connection = null; | |
}); | |
}); | |
} | |
LogstashStream.prototype.connect = function connect() { | |
if (this._connecting || this._connected) return; | |
this._connecting = true; | |
return this._getConnection() | |
.then(() => { | |
return this._connection | |
.createChannel() | |
.then((ch) => { | |
this._channel = ch; | |
return Promise.all([ | |
ch.assertExchange(this._exchange, 'topic', {durable: true}), | |
ch.assertQueue(this._queue, {durable: true, exclusive: false}), | |
ch.bindQueue(this._queue, this._exchange, '*.*.*'), | |
]); | |
}) | |
.then(() => { | |
this._connected = true; | |
this._connecting = false; | |
this.emit('connect'); | |
this.flush(); | |
}) | |
}); | |
} | |
LogstashStream.prototype.write = function logstashWrite(entry) { | |
let level, rec, msg; | |
if (typeof(entry) === 'string') { | |
rec = JSON.parse(entry); | |
} else { | |
rec = Object.assign({}, entry); | |
} | |
level = rec.level; | |
if (levels.hasOwnProperty(level)) { | |
level = levels[level]; | |
} | |
msg = { | |
level, | |
'@timestamp': rec.time.toISOString(), | |
message: rec.msg, | |
tags: this.tags, | |
source: this.server + '/' + this.application, | |
}; | |
if (typeof(this.type) === 'string' && this.type) { | |
msg.type = this.type; | |
} | |
delete rec.time; | |
delete rec.msg; | |
delete rec.v; | |
delete rec.level; | |
rec.pid = this.pid; | |
msg = Object.assign({}, msg, rec); | |
if (this.messageFormatter) { | |
msg = this.messageFormatter(msg); | |
if ((msg === undefined) || (msg === null)) { | |
return; | |
} | |
} | |
const rkA = this.server.replace('.', '-'); | |
const rkB = this.application.replace('.', '-'); | |
const rkC = level.replace('.', '-'); | |
const routingKey = `${rkA}.${rkB}.${rkC}`; | |
this.send(routingKey, JSON.stringify(msg, bunyan.safeCycles())); | |
}; | |
LogstashStream.prototype.flush = function () { | |
let buf = this._logBuffer.pop(); | |
while (buf) { | |
this.send(buf.routingKey, buf.message); | |
buf = this._logBuffer.pop(); | |
} | |
this._logBuffer.empty(); | |
}; | |
LogstashStream.prototype.send = function logstashSend(routingKey, message) { | |
if (this._channel) { | |
return this._channel.publish(this._exchange, routingKey, new Buffer(message)); | |
} | |
this.connect(); | |
this._logBuffer.push({routingKey: routingKey, message: message}); | |
}; | |
module.exports = { | |
createStream: createLogstashStream, | |
LogstashStream: LogstashStream | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment