Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mclark-newvistas/74a0a960e3e6d4b35455580b2acfc0fb to your computer and use it in GitHub Desktop.
Save mclark-newvistas/74a0a960e3e6d4b35455580b2acfc0fb to your computer and use it in GitHub Desktop.
var r = require('rethinkdb');
// you'd have to pass in an open rethink and deepstream connection, but that's trivial code to write
// rewrite of deepstream.io-provider-search-rethinkdb
module.exports = function(conn, client) {
var searches = new Map();
var pk = 'ds_id';
function getRow(path) {
// cribbed from official release
var parts = path.split(/[\[\]\.]/g).filter(val => val.trim().length > 0);
return parts.slice(1).reduce((row, part) => row(part), r.row(parts[0]));
}
function searchProvider(name, subscribed, response) {
if (!subscribed) {
if (!searches.has(name))
return console.error("Unexpected unsubscribe for", name);
searches.get(name)[0].delete();
searches.get(name)[1].close(err => {
if (err) return console.error("search close error", err);
});
searches.delete(name);
return;
}
try {
var search = JSON.parse(name.substr(7));
} catch (e) {
return console.error("JSON parse failed:", e);
}
if (!search.table)
return console.error('Missing parameter "table"');
if (!search.order != !search.limit) // XOR
return console.error('Must specify both "order" and "limit" together');
search.query = search.query || [];
var query = r.table(search.table);
// orderBy with and index must precede filters; must use an index for changefeeds
// TODO: should we check for index / add index if doesn't exist?
if (search.order)
query = query.orderBy({ index: r[search.desc ? 'desc' : 'asc'](search.order) });
var operators = new Set(['eq', 'match', 'gt', 'ge', 'lt', 'le', 'ne', 'in']);
for (var i = 0, il = search.query.length; i < il; i++) {
var condition = search.query[i];
if (condition.length !== 3)
return console.error("bad condition");
var path = condition[0];
var operator = condition[1];
var value = condition[2];
if (!operators.has(operator))
return console.error("bad operator");
if (operator === 'in' && !Array.isArray(value))
return console.error("'in' operator requires a JSON array");
var predicate;
if (operator !== 'in') {
predicate = getRow(path)[operator](value);
} else {
// doesn't take advantage of indexes, possible enhancement
// see https://www.rethinkdb.com/docs/sql-to-reql/javascript/
// TODO: path can be a path, not just a field
predicate = record => r.expr(value).contains(record(path));
}
query = query.filter(predicate);
}
query = query(pk);
if (search.limit)
query = query.limit(search.limit);
// we have a probably valid query
query.changes({
includeStates: true, includeInitial: true,
}).run(conn).then(cursor => {
response.accept();
searches.set(name, [client.record.getList(name), cursor]);
var ids = new Set();
function add(id) {
if (ids) return ids.add(id);
searches.get(name)[0].addEntry(id);
}
function remove(id) {
if (ids) return ids.delete(id);
searches.get(name)[0].removeEntry(id);
}
function replace(from, to) {
var values = ids;
if (!values)
values = searches.get(name)[0].getEntries();
values = values.filter(value => value !== from);
values.push(to);
if (!ids)
searches.get(name)[0].setEntries(values);
}
function ready() {
searches.get(name)[0].setEntries(Array.from(ids));
ids = undefined;
}
cursor.each((err, row) => {
if (err) {
if (err.message !== 'Cursor is closed.' || searches.has(name))
console.error("Unexpected cursor error:", err);
return;
}
// we already unsubscribed; we're done. Cursor also closed already above.
if (!searches.get(name))
return false;
if (row.state === 'ready')
return ready();
if (row.new_val && row.old_val)
replace(row.old_val, row.new_val);
else if (row.new_val)
add(row.new_val);
else if (row.old_val)
remove(row.old_val);
});
}).catch(e => {
response.reject();
console.error("query failed:", e);
});
}
client.record.listen("search\?", searchProvider);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment