Skip to content

Instantly share code, notes, and snippets.

@mclark-newvistas
Created February 3, 2017 22:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mclark-newvistas/a9e03fe6c89e3b6c54f6ad29e87bb9bc to your computer and use it in GitHub Desktop.
Save mclark-newvistas/a9e03fe6c89e3b6c54f6ad29e87bb9bc to your computer and use it in GitHub Desktop.
var EventEmitter = require('events');
var crypto = require('crypto');
var r = require('rethinkdb');
// source: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions
function escapeRegExp(string) {
return string.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); // $& means the whole matched string
}
// rewrite of deepstream.io-storage-rethinkdb
module.exports = function(conn, conf) {
var separator = conf.splitChar || '/';
var defaultTable = conf.defaultTable;
var tableMatch = new RegExp('^(\\w+)' + escapeRegExp(separator));
var primary = 'ds_id';
var emitter = new EventEmitter();
var creating = new Map(); // table => EventEmitter
var tableCache;
function splitKey(key) {
var table = key.match(tableMatch);
var values = [defaultTable, key];
if (table)
values = [table[1], key.substr(table[1].length + 1)];
// rethink can't have a key > 127 bytes; hash key and store alongside
if (values[1].length > 127) {
values[2] = values[1];
values[1] = crypto.createHash('sha256').update(values[1]).digest('hex');
}
return values;
}
// value is like { _v: 1, _d: { stuff I care about } }; _d might be an array
function convertFromDeepstream(value) {
value = JSON.parse(JSON.stringify(value)); // clone
var record = value._d;
delete value._d;
if (Array.isArray(record)) {
record = { __dsList: record };
}
record.__ds = value;
return record;
}
function convertToDeepstream(value) {
value = JSON.parse(JSON.stringify(value)); // clone
var record = value.__ds;
delete value.__ds;
record._d = value.__dsList ? value.__dsList : value;
return record;
}
function getTables() {
return r.tableList().run(conn).then(tables => {
tableCache = new Set(tables);
});
}
function createTable(table, callback) {
if (creating.has(table))
return creating.get(table).once('done', callback);
var created = new EventEmitter();
creating.set(table, created);
created.once('done', () => creating.delete(table)); // cleanup
created.once('done', callback);
return r.tableCreate(table, {
primaryKey: primary,
durability: 'soft', // matching upstream, we use the default, 'hard'
}).run(conn, err => {
if (err && err.msg.indexOf("already exists") === -1)
return created.emit('done', err);
if (err) // exists already, refresh table cache
return getTables().then(() => created.emit('done')).catch(err => created.emit('done', err));
tableCache.add(table);
created.emit('done');
});
}
function set(key, value, callback) {
var [table, id, fullKey] = splitKey(key);
// TODO: honestly, I'd rather not transparently create tables, for
// all that we can probably lock it down with permissions somehow
if (!tableCache.has(table)) // if no table, make it - match upstream
return createTable(table, err => {
if (err) return callback(err);
return set(key, value, callback);
});
value = convertFromDeepstream(value);
value[primary] = id;
if (fullKey)
value.__key = fullKey;
r.table(table).insert(value, {
returnChanges: false,
conflict: 'replace',
}).run(conn, callback);
}
function get(key, callback) {
var [table, id] = splitKey(key);
if (!tableCache.has(table)) // why no error? matching upstream
return callback(null, null);
r.table(table).get(id).run(conn).then(value => {
if (!value)
return callback(null, value);
delete value.__key; // in case is set
delete value[primary];
return callback(null, convertToDeepstream(value));
}).catch(e => callback(e));
}
function remove(key, callback) {
var [table, id] = splitKey(key);
if (!tableCache.has(table)) // why error? matching upstream
return callback(new Error("Table " + table + " does not exist"));
r.table(table).get(id).delete().run(conn, callback);
}
emitter.set = set;
emitter.get = get;
emitter.delete = remove;
emitter.isReady = false;
// get tables; not ready till then
getTables().then(() => {
emitter.isReady = true;
emitter.emit('ready');
}).catch(e => emitter.emit('error', e));
return emitter;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment