Skip to content

Instantly share code, notes, and snippets.

@shalaby
Created May 21, 2016 16:01
Show Gist options
  • Save shalaby/dba3f6935e262880ecc0e62b36d58ade to your computer and use it in GitHub Desktop.
Save shalaby/dba3f6935e262880ecc0e62b36d58ade to your computer and use it in GitHub Desktop.
RxJS MongoDB Query Builder
/* @flow */
import { Observable, Disposable, ReplaySubject } from 'rx';
import mongo from 'mongodb';
import { dbUrl } from './config';
import { assign } from 'lodash';
class QueryBuilder {
_db$: Observable;
_selectors: Object;
_sub: Disposable;
constructor(db$: Observable, selectors?: Object) {
this._db$ = db$;
if (!selectors) {
this._selectors = assign({
collection: null,
query: {},
opts: {},
sort: {},
offset: 0,
limit: 0
}, selectors);
}
else {
this._selectors = selectors;
}
}
static connect(url: string): QueryBuilder {
var db$ = new ReplaySubject(1);
mongo.connect(url, (err, db) => {
if (err) {
db$.onError(err);
}
else {
db$.onNext(db);
}
db$.onCompleted();
});
this._sub = db$.subscribe(
_ => console.log('Connected to database on', url),
err => console.error('Database connection error:', err.message, err.stack)
);
return new QueryBuilder(db$);
}
close(): any {
this._sub.dispose();
return QueryBuilder;
}
collection(name: string): QueryBuilder {
const ss = assign({}, this._selectors, { collection: name });
return new QueryBuilder(this._db$, ss);
}
select(query?: Object = {}, opts?: Object = {}): QueryBuilder {
const ss = assign({}, this._selectors, { query, opts });
return new QueryBuilder(this._db$, ss);
}
selectOne(query?: Object = {}, opts?: Object = {}): QueryBuilder {
const ss = assign({}, this._selectors, {
query: query,
opts: opts,
limit: 1
});
return new QueryBuilder(this._db$, ss);
}
sort(sort: Object): QueryBuilder {
const ss = assign({}, this._selectors, { sort });
return new QueryBuilder(this._db$, ss);
}
skip(offset: number): QueryBuilder {
const ss = assign({}, this._selectors, { offset });
return new QueryBuilder(this._db$, ss);
}
limit(limit: number = 0): QueryBuilder {
const ss = assign({}, this._selectors, { limit });
return new QueryBuilder(this._db$, ss);
}
exec(): Observable {
const ss = this._selectors;
let db$ = this._db$
if (!ss.collection) return Observable.throw('You have to provide collection name.');
if (!db$) return Observable.throw('No db connection found.');
let o = db$
.flatMapLatest(db => {
var c = db.collection(ss.collection);
var cursor = c.find(ss.query, ss.opts).sort(ss.sort).skip(ss.offset).limit(ss.limit);
var obs = Observable.fromNodeCallback(cursor.toArray, cursor);
return obs();
})
.finally(() => {
this._$db = null;
this._selectors = null;
});
return ss.limit === 1 ? o.map(res => res[0]) : o;
}
}
export default QueryBuilder.connect(dbUrl);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment