Skip to content

Instantly share code, notes, and snippets.

@smddzcy
Last active May 14, 2019 20:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save smddzcy/acd58d9fc4ae0bb2102b2c5e36df6ea3 to your computer and use it in GitHub Desktop.
Save smddzcy/acd58d9fc4ae0bb2102b2c5e36df6ea3 to your computer and use it in GitHub Desktop.
Mongoose-like ORM for Cosmos DB SQL API
const _ = require('lodash');
const Redis = require('ioredis');
const sqlBuilder = require('mongo-sql'); // from https://github.com/smddzcy/mongo-sql#master
const uuid = require('uuid');
const throat = require('throat');
const DbError = require('../error/dbError');
const cast = require('../cast');
const client = new CosmosClient({ endpoint: '...', key: '...' });
const database = client.database('...');
const cacheTtl = 3600;
const autoAddedFields = {
id: String,
_rid: String,
_self: String,
_etag: String,
_attachments: String,
_ts: Number,
};
class Schema {
constructor(schema, options) {
this.timestamps = true;
if (options) {
['timestamps', 'collection', 'model', 'partitionKey'].forEach(field => {
if (field in options) {
this[field] = options[field];
}
});
}
this.virtuals = [];
this.hooks = [];
this.schema = schema || {};
this.schema = { ...autoAddedFields, ...this.schema };
if (this.timestamps) {
this.schema.createdAt = Date;
this.schema.updatedAt = Date;
}
if (!this.collection) {
if (this.model) {
this.collection = `${_.camelCase(this.model)}s`;
} else {
throw new Error('Schema should either define a `model` or `collection`');
}
}
if (!this.partitionKey) {
throw new Error('Schema should define a `partitionKey`');
}
this.createCollection();
this.ready = this.ready.bind(this);
this.deleteCollection = this.deleteCollection.bind(this);
this.createCollection = this.createCollection.bind(this);
this.recreateCollection = this.recreateCollection.bind(this);
this.mapVirtuals = this.mapVirtuals.bind(this);
this.virtual = this.virtual.bind(this);
this.addHook = this.addHook.bind(this);
this.map = this.map.bind(this);
this.create = this.create.bind(this);
this._getCacheKey = this._getCacheKey.bind(this);
this._deleteCache = this._deleteCache.bind(this);
this._deleteCacheForType = this._deleteCacheForType.bind(this);
this._getFromCacheOrSet = this._getFromCacheOrSet.bind(this);
this._find = this._find.bind(this);
this.find = this.find.bind(this);
this.findOne = this.findOne.bind(this);
this.findById = this.findById.bind(this);
this.findAll = this.findAll.bind(this);
this._updateItem = this._updateItem.bind(this);
this.findOneAndUpdate = this.findOneAndUpdate.bind(this);
this.findOneOrCreate = this.findOneOrCreate.bind(this);
this.update = this.update.bind(this);
this.updateOne = this.updateOne.bind(this);
this.updateById = this.updateById.bind(this);
this.remove = this.remove.bind(this);
this.removeById = this.removeById.bind(this);
// register the model
Schema._registeredModels[this.model] = this;
}
// returns a promise that resolves when the schema is ready to take operations
ready() {
return new Promise(resolve => {
const checkAsync = () => setTimeout(() => {
if (this._container) {
return resolve(true);
}
checkAsync();
}, 50);
checkAsync();
});
}
deleteCollection() {
return this._container.delete();
}
createCollection() {
return database.containers.createIfNotExists({
id: this.collection,
partitionKey: { paths: [`/${this.partitionKey}`], kind: 'Hash' }
}).then(({ container }) => {
this._container = container;
}).catch(err => {
console.error(err);
throw new DbError(`Couldn't create container ${this.collection}: ${err}`);
});;
}
async recreateCollection() {
await this.deleteCollection();
await this.createCollection();
}
static castVal(type, val, path = 'unknown') {
if (type === Number) {
return cast.number(val);
}
if (type === String) {
return cast.string(val, path);
}
if (type === Boolean) {
return cast.boolean(val, path);
}
if (type === Date) {
return cast.date(val);
}
if (type === Array) {
return _.toArray(val);
}
if (type === Object || type === Schema.Types.Mixed || type === Schema.Types.Reference) {
return val;
}
return val;
}
mapVirtuals(obj) {
this.virtuals.forEach(virtual => {
obj[virtual.field] = virtual.getter.call(obj);
});
}
virtual(field) {
return {
get: getter => this.virtuals.push({ field, getter })
};
}
addHook(type, hook) {
this.hooks.push({ type, hook });
}
async _runHooks(type, object) {
await Promise.all(this.hooks
.filter(hook => hook.type === type)
.map(hook => hook.hook(object)));
return object;
}
map(obj, isRead = true) {
if (!obj) return obj;
if (Array.isArray(obj)) {
return obj.map(el => this.map(el, isRead));
}
const ret = Object.keys(this.schema).reduce((xs, x) => {
let val = obj[x];
if (!isRead && x in autoAddedFields) {
// if it's not a read and x is an auto-added field, just propagate that
if (val !== undefined) {
xs[x] = val;
}
return xs;
}
const field = this.schema[x];
if (val == null) {
if (typeof field === 'object') {
if ('default' in field) {
val = typeof field.default === 'function' ? field.default() : field.default;
} else if ('required' in field) {
throw new DbError(`Value '${val}' in path '${x}' is required`);
}
}
xs[x] = null; // don't leave values undefined, it's problematic
return xs;
}
if (typeof field === 'function') {
val = Schema.castVal(field, val, x);
} else {
val = Schema.castVal(field.type, val, x);
if (!isRead) {
// if it's a read operation, only map defaults
if ('enum' in field && !field.enum.includes(val)) {
throw new DbError(`Value '${val}' is not a valid enum value, possible values are: ${field.enum}`);
}
if (field.type === String) {
if ('trim' in field) {
val = val.trim();
}
if ('uppercase' in field) {
val = val.toLocaleUpperCase();
}
if ('minlength' in field && val.length < field.minlength) {
throw new DbError(`Value '${val}' in path '${x}' is too short, minlength: ${field.min}`);
}
if ('maxlength' in field && val.length > field.maxlength) {
throw new DbError(`Value '${val}' in path '${x}' is too long, maxlength: ${field.min}`);
}
}
if (field.type === Number) {
if ('min' in field && val < field.min) {
throw new DbError(`Value '${val}' in path '${x}' is too small, min: ${field.min}`);
}
if ('max' in field && val > field.max) {
throw new DbError(`Value '${val}' in path '${x}' is too big, max: ${field.max}`);
}
}
}
}
xs[x] = val;
return xs;
}, {});
if (isRead) {
// map the virtual fields if it's a read operation
this.mapVirtuals(ret);
}
return ret;
}
async create(model, options = ({ deleteCache: true, runHooks: true })) {
const now = new Date();
if (this.timestamps) {
if (!('createdAt' in model)) {
model.createdAt = now;
}
if (!('updatedAt' in model)) {
model.updatedAt = now;
}
}
// map defaults etc
const mappedModel = this.map(model, false);
if (options.runHooks) {
await this._runHooks('beforeCreate', mappedModel);
}
if (options.deleteCache) {
this._deleteCacheForType('sql');
}
if (!mappedModel.id) {
// see: https://github.com/Azure/azure-cosmos-js/issues/241
mappedModel.id = uuid.v4();
}
const { body } = await this._container.items.create(mappedModel);
return this.map(body, true);
}
_getCacheKey(type, label) {
return `${this.model}:${type}:${label}`;
}
_deleteCache(type, label) {
redis.del(this._getCacheKey(type, label)).catch(err => {
logger.error(`Redis - Error while deleting cache for "id" "${label}": ${err}`);
});
}
_deleteCacheForType(type) {
redis.keys(`${this.model}:${type}:*`).then(keys => keys.map(key => redis.del(key))).catch(err => {
logger.error(`Redis - Error while deleting cache for all keys under "${type}": ${err}`);
});
}
async _getFromCacheOrSet(type, label, setter) {
const cacheKey = this._getCacheKey(type, label);
const cacheResult = await redis.get(cacheKey);
if (cacheResult) {
logger.info(`Redis - Key: ${cacheKey}, Cache hit: ${cacheResult.slice(0, 10)}...`);
return this.map(JSON.parse(cacheResult), true);
}
logger.info(`Redis - Key: ${cacheKey}, Cache miss`);
const result = await setter();
redis.setex(cacheKey, cacheTtl, JSON.stringify(result)).catch(err => {
logger.error(`Redis - Error while setting cache for "${type}" "${label}": ${err}`);
});
return result;
}
async _find(query, options = {}) {
const sql = sqlBuilder.sql({
type: 'select',
table: this.collection,
where: query,
...options,
});
const sqlStr = sql.toString();
const params = sql.values.map((value, idx) => ({ name: `@p${idx + 1}`, value }));
const cacheLabel = sqlStr.replace(/@(p\d*)/g, (...args) => `'${params.find(v => v.name === args[0]).value}'`);
// get the docs from cache or db
const mappedDocs = await this._getFromCacheOrSet('sql', cacheLabel, async () => {
const querySpec = {
query: sqlStr,
parameters: params,
};
const { result: items } = await this._container.items.query(querySpec, {
enableCrossPartitionQuery: true,
}).toArray();
return items;
}).then(this.map);
// make the populations, if any requested
if (options.populate && [].concat(options.populate).length > 0) {
await Promise.all(
[].concat(options.populate).map(async field => {
if (!this.schema[field]) {
throw new DbError(`${field} does not exist in the schema`);
}
if (!this.schema[field].ref) {
throw new DbError(`${field} does not have a 'ref' field`);
}
if (!Schema._registeredModels[this.schema[field].ref]) {
throw new DbError(`${this.schema[field].ref} is not a registered model`);
}
const model = Schema._registeredModels[this.schema[field].ref];
const ids = _.uniq(_.compact(mappedDocs.map(doc => doc[field])));
const docs = await Promise.all(
ids.map(throat(20, id => model.findById(id).catch(() => null))),
);
const idToDoc = ids.reduce((xs, x, idx) => {
xs[x] = model.map(docs[idx]);
return xs;
}, {});
mappedDocs.forEach(mappedDoc => {
if (mappedDoc[field] && idToDoc[mappedDoc[field]]) {
mappedDoc[field] = idToDoc[mappedDoc[field]];
} else {
mappedDoc[field] = null;
}
});
}),
);
}
return mappedDocs;
}
find(query, options) {
if ((!query || Object.keys(query).length === 0)
&& (!options || Object.keys(options).length === 0)) {
// minor optimization
return this.findAll();
}
return this._find(query, options);
}
async findOne(query, options) {
options = options || {};
let items = [];
try {
items = await this._find(query, { ...options, limit: 1 });
} catch (ignored) {
// single-partition query failed, try cross-partition
items = await this._find(query, options);
}
return items[0];
}
findById(id) {
return this.findOne({ id });
}
async findAll() {
return this._getFromCacheOrSet('sql', `select * from ${this.collection}`, async () => {
const { result: items } = await this._container.items.readAll().toArray();
return items;
}).then(this.map);
}
async _updateItem(item, fields) {
this._deleteCacheForType('sql');
const newItem = { ...item };
Object.entries(fields).forEach(([key, val]) => {
_.set(newItem, key, val);
});
newItem.updatedAt = new Date();
const model = this.map(newItem, false);
const { body } = await this._container.item(item.id, item[this.partitionKey]).replace(model);
return this.map(body, true);
}
async findOneAndUpdate(query, fields, opts) {
try {
const item = await this.findOne(query);
return this._updateItem(item, fields);
} catch (err) {
if (!(err instanceof DbError)) {
throw err;
}
if (opts.upsert) {
return this.create({ ...query, ...fields });
}
throw new DbError(`No items found for query: ${JSON.stringify(query)}`);
}
}
async findOneOrCreate(doc) {
const one = await this.findOne(doc);
return one || this.create(doc);
}
async update(query, fields) {
const items = await this.find(query);
return Promise.all(items.map(item => this._updateItem(item, fields)));
}
async updateOne(query, fields) {
const item = await this.findOne(query);
return this._updateItem(item, fields);
}
async updateById(id, fields) {
const item = await this.findById(id);
return this._updateItem(item, fields);
}
async remove(query) {
const items = await this.find(query);
return Promise.all(items.map(item => this.removeById(item.id)));
}
async removeById(id) {
const item = await this.findById(id);
this._deleteCacheForType('sql');
return this._container.item(item.id, item[this.partitionKey]).delete();
}
}
Schema._registeredModels = {}
Schema.Types = {
Mixed: i => i,
Reference: i => i,
}
module.exports = Schema;
const CastError = require('../error/castError');
module.exports = (value, path) => {
if (value == null) {
return value;
}
if (module.exports.convertToTrue.has(value)) {
return true;
}
if (module.exports.convertToFalse.has(value)) {
return false;
}
throw new CastError('boolean', value, path);
};
module.exports.convertToTrue = new Set([true, 'true', 'True', 1, '1', 'yes', 'Yes']);
module.exports.convertToFalse = new Set([false, 'false', 'False', 0, '0', 'no', 'No']);
const assert = require('assert');
module.exports = (value) => {
if (value == null || value === '') {
return null;
}
if (value instanceof Date) {
assert.ok(!isNaN(value.valueOf()));
return value;
}
let date;
assert.ok(typeof value !== 'boolean');
if (value instanceof Number || typeof value === 'number') {
date = new Date(value);
} else if (typeof value === 'string' && !isNaN(Number(value)) && (Number(value) >= 275761 || Number(value) < -271820)) {
// string representation of milliseconds take this path
date = new Date(Number(value));
} else if (typeof value.valueOf === 'function') {
// support for moment.js. This is also the path strings will take because
// strings have a `valueOf()`
date = new Date(value.valueOf());
} else {
// fallback
date = new Date(value);
}
if (!isNaN(date.valueOf())) {
return date;
}
return value;
};
const date = require('./date');
const boolean = require('./boolean');
const number = require('./number');
const string = require('./string');
module.exports = {
date,
boolean,
number,
string,
};
const assert = require('assert');
module.exports = (val) => {
if (isNaN(val)) {
return null;
}
if (val == null) {
return val;
}
if (val === '') {
return null;
}
if (typeof val === 'string' || typeof val === 'boolean') {
val = Number(val);
}
if (isNaN(val)) {
return null;
}
if (val instanceof Number) {
return val;
}
if (typeof val === 'number') {
return val;
}
if (!Array.isArray(val) && typeof val.valueOf === 'function') {
return Number(val.valueOf());
}
// eslint-disable-next-line eqeqeq
if (val.toString && !Array.isArray(val) && val.toString() == Number(val)) {
// eslint-disable-next-line no-new-wrappers
return new Number(val);
}
assert.ok(false);
};
const CastError = require('../error/castError');
module.exports = function (value, path) {
// If null or undefined
if (value == null) {
return value;
}
// handle documents being passed
if (value.id && typeof value.id === 'string') {
return value.id;
}
// Re: gh-647 and gh-3030, we're ok with casting using `toString()`
// **unless** its the default Object.toString, because "[object Object]"
// doesn't really qualify as useful data
if (value.toString
&& value.toString !== Object.prototype.toString
&& !Array.isArray(value)) {
return value.toString();
}
throw new CastError('string', value, path);
};
const util = require('util');
const DbError = require('./dbError');
function CastError(type, value, path, reason) {
let stringValue = util.inspect(value);
stringValue = stringValue.replace(/^'/, '"').replace(/'$/, '"');
if (stringValue.charAt(0) !== '"') {
stringValue = `"${stringValue}"`;
}
DbError.call(this, `Cast to ${type} failed for value ${
stringValue} at path "${path}"`);
this.name = 'CastError';
if (Error.captureStackTrace) {
Error.captureStackTrace(this);
} else {
this.stack = new Error().stack;
}
this.stringValue = stringValue;
this.kind = type;
this.value = value;
this.path = path;
this.reason = reason;
}
CastError.prototype = Object.create(DbError.prototype);
CastError.prototype.constructor = DbError;
CastError.prototype.setModel = function (model) {
this.model = model;
this.message = `Cast to ${this.kind} failed for value ${
this.stringValue} at path "${this.path}"` + ` for model "${
model.modelName}"`;
};
module.exports = CastError;
function DbError(msg) {
Error.call(this);
if (Error.captureStackTrace) {
Error.captureStackTrace(this);
} else {
this.stack = new Error().stack;
}
this.message = msg;
this.name = 'DbError';
}
DbError.prototype = Object.create(Error.prototype);
DbError.prototype.constructor = Error;
module.exports = DbError;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment