Skip to content

Instantly share code, notes, and snippets.

@intech
Last active July 27, 2022 12:51
Show Gist options
  • Save intech/52b82fca6d2c5f47d7cb4cfa2d3f3c2e to your computer and use it in GitHub Desktop.
Save intech/52b82fca6d2c5f47d7cb4cfa2d3f3c2e to your computer and use it in GitHub Desktop.
Moleculer ORM Objection.js (https://github.com/vincit/objection.js) middleware by pattern Shared database (https://microservices.io/patterns/data/shared-database.html)
// file: models/Base.js
const { Model } = require("objection");
module.exports = class BaseModel extends Model {
static get modelPaths() {
return [__dirname];
}
static get useLimitInFirst() {
return true;
}
$beforeInsert() {
const { timestamp } = this.constructor;
if (timestamp && timestamp.created && this[timestamp.created] === undefined)
this[timestamp.created] = new Date().toISOString();
}
$beforeUpdate() {
const { timestamp } = this.constructor;
if (timestamp && timestamp.updated && this[timestamp.updated] === undefined)
this[timestamp.updated] = new Date().toISOString();
}
};
// file: middlewares/objection.middleware.js
const glob = require("glob").sync;
const Knex = require("knex");
const { Model, initialize } = require("objection");
const profilerSymbol = Symbol("profiler");
module.exports = function KnexMiddleware(options) {
const storage = new AsyncLocalStorage();
const connection = new Knex({
client: "postgres",
connection: process.env.DB,
asyncStackTraces: true,
pool: {
max: 10,
min: 1,
acquireTimeoutMillis: 5000,
createTimeoutMillis: 5000,
destroyTimeoutMillis: 5000,
idleTimeoutMillis: 5000,
reapIntervalMillis: 1000,
createRetryIntervalMillis: 200,
propagateCreateError: false
},
acquireConnectionTimeout: 10000,
...options
});
let models = {};
let logger = null;
const regexp = /(\$\d+),[\s\t\n]+?/gm;
return {
name: "Objection",
isTracing: false,
isMetrics: false,
[profilerSymbol]: null,
async created(broker) {
logger = broker.getLogger("objection");
this.isMetrics = broker.isMetricsEnabled();
this.isTracing = broker.isTracingEnabled();
this[profilerSymbol] = data => {
const ctx = storage.getStore() || {};
if (this.isTracing) {
const tracer = ctx.span ? ctx.span : broker.tracer;
const type = data.__knexTxId ? `db.${data.__knexTxId}` : "db.query";
const sql = data.sql.replace(regexp, "");
ctx._tracer = tracer.startSpan(`${type} '${sql}'`, {
type: data.__knexTxId ? "transaction" : "query"
});
}
if (this.isMetrics) {
broker.metrics.increment("db.query.total");
broker.metrics.increment("db.query.active");
// create timer instance for metric query time
ctx._metric = broker.metrics.timer("db.query.time", {
method: data.method,
query: "query"
});
}
storage.enterWith(ctx);
};
if (this.isMetrics) {
broker.metrics.register({
name: "db.query.total",
type: "counter",
unit: "query",
description: "Number of queries",
rate: true
});
broker.metrics.register({
name: "db.query.errors",
type: "counter",
unit: "query",
description: "Number of error queries",
rate: true
});
broker.metrics.register({
name: "db.query.active",
type: "gauge",
unit: "query",
description: "Number of active queries"
});
broker.metrics.register({
name: "db.query.time",
type: "histogram",
labelNames: ["query"],
quantiles: true,
buckets: true,
unit: "milliseconds",
description: "Query times in milliseconds",
rate: true
});
}
connection.prependListener("query", data => this[profilerSymbol](data));
connection.on("query-response", () => {
const ctx = storage.getStore();
if (this.isTracing && ctx._tracer) ctx._tracer.finish();
if (this.isMetrics) {
if (ctx._metric) ctx._metric();
broker.metrics.decrement("db.query.active");
}
});
connection.on("query-error", (error, obj) => {
logger.debug("trace error:", obj.sql);
// if("originalStack" in e) {
// e.stack = e.originalStack;
// console.trace(e.originalStack);
// }
const ctx = storage.getStore();
if (this.isTracing && ctx._tracer) {
ctx._tracer.setError(error);
}
// logger.error(error.toString());
});
models = glob(__dirname + "/../models/**/*.js", { absolute: true });
if (!models.length) {
logger.warn("There is no files in directory models");
}
Model.knex(connection);
models = Object.fromEntries(
models.map(model => {
const m = require(model);
return [m.name, m];
})
);
logger.info("Objection middleware ready");
},
localAction(next) {
return ctx => {
const { span } = ctx;
return storage.run({ span }, () => next(ctx));
};
},
localEvent(next) {
return ctx => {
const { span } = ctx;
return storage.run({ span }, () => next(ctx));
};
},
async starting() {
if (Object.values(models).length) {
await initialize(connection, Object.values(models));
} else {
await connection.queryBuilder().select(1);
}
logger.info("Connection has been established successfully.");
},
async stopping() {
if (connection) await connection.destroy();
logger.info("Objection middleware close");
},
serviceCreating(service, schema) {
if (!schema.name.startsWith("$")) {
service.models = models;
logger.debug(`Objection for \`${schema.name}\` ready`);
}
}
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment