-
-
Save arunoda/aa5aec1ff2757fa7baa9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
*/ | |
function Query(collection, selector, options) { | |
this._selector = selector; | |
this._selectorMatcher = LocalCollection._compileSelector(this._selector); | |
this._options = options || {}; | |
this._collection = collection; | |
this._amatureObservers = []; | |
this._observers = []; | |
this.snapshotInProgress = false; | |
this._needSnapshot = false; | |
this._idMap = {}; | |
this._pendingSnapshotCallbacks = []; | |
//sort specific fields | |
if(typeof(this._options.sort) == 'object') { | |
this._sortDocCacheMap = {}; | |
this._sortDocCacheList = []; | |
this._sortable = true; | |
this._sortFields = this._getSortFields(this._options.sort); | |
//always | |
this._sortFields.push('_id'); | |
this._sortFields = _.uniq(this._sortFields); | |
this._sortComparator = LocalCollection._compileSort(this._options.sort); | |
} | |
} | |
Query.prototype.added = function(doc) { | |
//if snapshotInProgress, do not proceed just ask for a queryReRun again | |
if(this.snapshotInProgress) { | |
this._needSnapshot = true; | |
return; | |
} | |
//if the current documents in the cursor is aligned with the limit, do no add them | |
if(!this._sortable && this._options.limit > 0 && _.keys(this._idMap).length >= this._options.limit) { | |
return; | |
} | |
if(!this._idMap[doc._id]) { | |
if(this._sortable && this._options.limit > 0) { | |
//add doc to the cacheList | |
var index = LocalCollection._binarySearch(this._sortComparator, this._sortDocCacheList, doc); | |
if(index < this._options.limit) { | |
//within the limit range | |
this._rawAdded(doc); | |
//check for limit exceeds | |
if(this._sortDocCacheList.length > this._options.limit) { | |
//remove the last doc | |
var lastId = this._sortDocCacheList[this._options.limit]._id; | |
this._rawRemoved(lastId); | |
} | |
} | |
} else { | |
this._rawAdded(doc); | |
} | |
} | |
}; | |
Query.prototype.removed = function(id) { | |
//if snapshotInProgress, do not proceed just ask for a queryReRun again | |
if(this.snapshotInProgress) { | |
this._needSnapshot = true; | |
return; | |
} | |
if(this._idMap[id]) { | |
this._rawRemoved(id); | |
//start query re-run if there is a limit | |
if(this._options.limit > 0) { | |
this.snapshot(); | |
} | |
} | |
}; | |
Query.prototype.changed = function(id, fields) { | |
if(this._idMap[id]) { | |
if(this._options.limit > 0 && this._sortable && this._isSortFieldsChanged(fields)) { | |
//if sorted and limited, we need to snapshot again | |
this.snapshot(); | |
} else { | |
this._rawChanged(id, fields); | |
} | |
} | |
}; | |
Query.prototype.removeExceptTheseIds = function(newIds) { | |
var self = this; | |
var existingIds = _.keys(this._idMap); | |
var removedIds = _.difference(existingIds, newIds); | |
removedIds.forEach(function(id) { | |
self._rawRemoved(id); | |
}); | |
}; | |
Query.prototype._rawAdded = function(doc) { | |
this._idMap[doc._id] = true; | |
this._observers.forEach(function(observer) { | |
observer.added(doc); | |
}); | |
//caching for sort | |
if(this._sortable) { | |
var sortCacheDoc = _.pick(doc, this._sortFields); | |
this._sortDocCacheMap[doc._id] = sortCacheDoc; | |
LocalCollection._insertInSortedList(this._sortComparator, this._sortDocCacheList, sortCacheDoc); | |
} | |
}; | |
Query.prototype._rawRemoved = function(id) { | |
delete this._idMap[id]; | |
this._observers.forEach(function(observer) { | |
observer.removed(id); | |
}); | |
//remove sort caching | |
if(this._sortable) { | |
var cachedDoc = this._sortDocCacheMap[id]; | |
delete this._sortDocCacheMap[id]; | |
var index = this._sortDocCacheList.indexOf(cachedDoc); | |
this._sortDocCacheList.splice(index, 1); | |
} | |
}; | |
Query.prototype._rawChanged = function(id, fields) { | |
this._observers.forEach(function(observer) { | |
observer.changed(id, fields); | |
}); | |
//caching for sort | |
if(this._sortable) { | |
var cachedDoc = this._sortDocCacheMap[id]; | |
var changedDoc = _.pick(fields, this._sortFields); | |
_.extend(cachedDoc, changedDoc); | |
this._sortDocCacheList.sort(this._sortComparator); | |
} | |
}; | |
Query.prototype.addObserver = function(observer, callback) { | |
console.log('AO', arguments); | |
this._amatureObservers.push({ | |
observer: observer, | |
callback: callback | |
}); | |
if(!this.snapshotInProgress) { | |
this.snapshot(); | |
} | |
}; | |
Query.prototype.removeObserver = function(observer) { | |
var index = this._observers.indexOf(observer); | |
if(index >= 0) { | |
this._observers.splice(index, 1); | |
} | |
}; | |
Query.prototype.countObservers = function() { | |
return this._observers.length; | |
}; | |
Query.prototype.snapshot = function(callback) { | |
if(this.snapshotInProgress) { | |
this._needSnapshot = true; | |
if(callback) { | |
this._pendingSnapshotCallbacks.push(callback); | |
} | |
return; | |
} | |
var self = this; | |
this.snapshotInProgress = true; | |
this._needSnapshot = false; | |
var callbacks = this._pendingSnapshotCallbacks; | |
if(callback) { | |
callbacks.push(callback); | |
} | |
this._pendingSnapshotCallbacks = []; | |
//we need to set this to avoid, adding observers at the middle of the SNAPSHOT | |
var amatureObservers = this._amatureObservers; | |
this._amatureObservers = []; | |
var idsExists = []; | |
this._collection._collection.find(this._selector, this._options).each(function(err, doc) { | |
if(err) { | |
fireCallbacks(err); | |
} else if(doc) { | |
idsExists.push(doc._id); | |
//handling for amature observers - need to added always | |
for(var lc=0; lc<amatureObservers.length; lc++) { | |
console.log('LOOP', amatureObservers[lc]); | |
var observer = amatureObservers[lc].observer; | |
observer.added(doc); | |
} | |
if(self.idExists(doc._id)) { | |
var fields = _.omit(doc, '_id'); | |
self._rawChanged(doc._id, fields); | |
} else { | |
self._rawAdded(doc); | |
} | |
} else { //end of the cursor | |
//trigger for remove docs | |
self.removeExceptTheseIds(idsExists); | |
//merge amature observers | |
for(var index in amatureObservers) { | |
var observerInfo = amatureObservers[index]; | |
self._observers.push(observerInfo.observer); | |
if(observerInfo.callback) { | |
observerInfo.callback(); | |
} | |
} | |
fireCallbacks(); | |
self.snapshotInProgress = false | |
//need an snapshot or if there is a new amature observers, we need to snapshot again | |
if(self._needSnapshot || self._amatureObservers.length > 0) { | |
self.snapshot(); | |
} | |
} | |
}); | |
function fireCallbacks(err) { | |
callbacks.forEach(function(callback) { | |
callback(err); | |
}); | |
} | |
}; | |
Query.prototype._isSortFieldsChanged = function(doc) { | |
var commonFields = _.intersection(_.keys(doc), _.without(this._sortFields, '_id')); | |
return commonFields.length > 0; | |
}; | |
Query.prototype._getSortFields = function(sortExpression) { | |
var fields = []; | |
if(sortExpression instanceof Array) { | |
sortExpression.forEach(function(exp) { | |
if(typeof(exp) == 'string') { | |
fields.push(exp); | |
} else { | |
fields.push(exp[0]); | |
} | |
}); | |
} else if(typeof sortExpression == 'object') { | |
fields = _.keys(sortExpression); | |
} else { | |
throw new Error('invalid sort expression: ' + JSON.stringify(sortExpression)); | |
} | |
return _.uniq(fields); | |
}; | |
Query.prototype.idExists = function idExists(id) { | |
return (this._idMap[id])? true: false; | |
}; | |
Query.prototype.getCursor = function() { | |
return this._collection._collection.find(this._selector, this._options); | |
}; | |
Meteor.SmartQuery = Query; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment