Skip to content

Instantly share code, notes, and snippets.

@icebob
Last active October 18, 2022 16:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save icebob/228498f646822a03a1e7cc14fb0ce856 to your computer and use it in GitHub Desktop.
Save icebob/228498f646822a03a1e7cc14fb0ce856 to your computer and use it in GitHub Desktop.
Loki logger for Moleculer microservices framework

Loki logger for Moleculer microservices framework

It sends the Moleculer log messages to a Grafana Loki server directly. I'm using it to send logs from Fly.io moleculer app to Grafana Cloud.

For Kubernetes, I recommend to use Grafana Agent to send log messages because Agent adds K8s pod relevant labels to the messages.

Usage

// moleculer.config.js

require("./backend/libs/loki-logger"); 

module.exports = {
  logger: [
    {
      type: "Console",
      options: {}
    },
    {
      type: "Loki",
      options: {}
    }
  ],
  // ...
};

Options

{
  url: process.env.LOKI_URL || "https://localhost:3100",
  username: process.env.LOKI_USERNAME,
  password: process.env.LOKI_PASSWORD,
  defaultLabels: null, // Object
  objectPrinter: null,
  interval: 10 * 1000,
  maxRows: 100  
}

Labels

  • level: Log level
  • nodeID: Broker nodeID
  • namespace: Broker namespace
  • module: Logger module name e.g.: broker or v2.posts
  • svc: Service name

and merged with opts.defaultLabels which is a key-value POJO.

Result

image

License

The project is available under the MIT license.

Contact

Copyright (c) 2022 MoleculerJS

@MoleculerJS @MoleculerJS

/*
* moleculer
* Copyright (c) 2022 MoleculerJS (https://github.com/moleculerjs/moleculer)
* MIT Licensed
*/
"use strict";
const BaseLogger = require("moleculer").Loggers.Base;
const { register } = require("moleculer").Loggers;
const _ = require("lodash");
const fetch = require("node-fetch");
fetch.Promise = Promise;
const util = require("util");
/**
* Loki logger for Moleculer
*
* @class LokiLogger
* @extends {BaseLogger}
*/
class LokiLogger extends BaseLogger {
/**
* Creates an instance of LokiLogger.
*
* @param {Object} opts
* @memberof LokiLogger
*/
constructor(opts) {
super(opts);
this.opts = _.defaultsDeep(this.opts, {
url: process.env.LOKI_URL || "https://localhost:3100",
username: process.env.LOKI_USERNAME,
password: process.env.LOKI_PASSWORD,
defaultLabels: null,
objectPrinter: null,
interval: 10 * 1000,
maxRows: 100
});
this.queue = [];
this.timer = null;
}
/**
* Initialize logger.
*
* @param {LoggerFactory} loggerFactory
*/
init(loggerFactory) {
super.init(loggerFactory);
this.objectPrinter = this.opts.objectPrinter
? this.opts.objectPrinter
: o =>
util.inspect(o, {
showHidden: false,
depth: 2,
colors: false,
breakLength: Number.POSITIVE_INFINITY
});
if (this.opts.interval > 0) {
this.timer = setInterval(() => this.flush(), this.opts.interval);
this.timer.unref();
}
}
/**
* Stopping logger
*/
stop() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
return this.flush();
}
/**
* Generate a new log handler.
*
* @param {object} bindings
*/
getLogHandler(bindings) {
let level = bindings ? this.getLogLevel(bindings.mod) : null;
if (!level) return null;
const printArgs = args => {
return args.map(p => {
if (_.isObject(p) || Array.isArray(p)) return this.objectPrinter(p);
return p;
});
};
const levelIdx = BaseLogger.LEVELS.indexOf(level);
return (type, args) => {
const typeIdx = BaseLogger.LEVELS.indexOf(type);
if (typeIdx > levelIdx) return;
this.queue.push({
ts: Date.now(),
level: type,
msg: printArgs(args).join(" "),
bindings
});
if (this.opts.maxRows > 0 && this.queue.length >= this.opts.maxRows) {
this.flush();
}
};
}
convertLogLevel(level) {
switch (level) {
case "fatal":
return "critical";
case "warn":
return "warning";
case "trace":
return "debug";
default:
return level;
}
}
getLabels(row) {
const labels = {
...(this.opts.defaultLabels ? this.opts.defaultLabels : {}),
level: this.convertLogLevel(row.level),
nodeID: row.bindings.nodeID,
namespace: row.bindings.ns,
module: row.bindings.mod
};
if (row.bindings.svc) labels.service = row.bindings.svc;
return labels;
}
/**
* Flush queued log entries to Grafana Loki.
*/
flush() {
if (this.queue.length > 0) {
const rows = Array.from(this.queue);
this.queue.length = 0;
const streamsByLevel = {};
rows.forEach(row => {
let stream = streamsByLevel[row.level];
if (!stream) {
streamsByLevel[row.level] = {
stream: this.getLabels(row),
values: []
};
stream = streamsByLevel[row.level];
}
stream.values.push([String(row.ts * 1000000), row.msg]);
});
const payload = {
streams: Object.values(streamsByLevel)
};
return fetch(this.opts.url + "/loki/api/v1/push", {
method: "post",
body: JSON.stringify(payload),
headers: {
"Content-Type": "application/json",
Authorization: this.opts.username
? "Basic " +
Buffer.from(this.opts.username + ":" + this.opts.password).toString(
"base64"
)
: undefined
}
})
.then(res => {
//console.debug(`Logs (${rows.length}) are uploaded to Loki. Status:`,res.statusText);
})
.catch(err => {
console.warn("Unable to upload logs to Loki server. Error:" + err.message, err);
});
}
return this.broker.Promise.resolve();
}
}
register("Loki", LokiLogger);
module.exports = LokiLogger;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment