Skip to content

Instantly share code, notes, and snippets.

@victorpoluceno
Created December 5, 2012 14:28
Show Gist options
  • Save victorpoluceno/4215887 to your computer and use it in GitHub Desktop.
Save victorpoluceno/4215887 to your computer and use it in GitHub Desktop.
Incremental map reduce and ObjectId timestamp
import time
import datetime
import pymongo
from bson.objectid import ObjectId
from bson.code import Code
from bson.son import SON
DATA_SAMPLE = [
{'data': 1, 'name': 'a'},
{'data': 1, 'name': 'a'},
{'data': 1, 'name': 'a'},
{'data': 1, 'name': 'a'},
{'data': 1, 'name': 'a'},
{'data': 1, 'name': 'b'},
{'data': 1, 'name': 'b'},
{'data': 1, 'name': 'b'},
{'data': 1, 'name': 'b'},
{'data': 1, 'name': 'b'},
]
_map = Code("""function (){
emit(this.name, {'data': this.data});
};""")
_reduce = Code("""function (key, values){
var sum = 0;
values.forEach(function(doc){
sum += doc.data;
});
return {data: sum};
};""")
_finalize = Code("""function (key, reducedValue) {
reducedValue.ts = ts;
return reducedValue;
};""")
class Test:
def __init__(self):
self.m = pymongo.MongoClient(fsync=True)
self.m.test.drop_collection('bug')
self.m.test.drop_collection('bug_stats')
for l in DATA_SAMPLE:
self.m.test.bug.insert(l)
def find_checkpoint(self, db):
cursor = db.bug_stats.find(limit=1, sort=[('ts', pymongo.DESCENDING)])
if cursor.count() == 0:
return None
ts = cursor[0]['value']['ts'] / 1e3 # timestamp in seconds
return datetime.datetime.fromtimestamp(ts)
def map_reduce(self, db):
# BUG this fix the problem
time.sleep(1)
query = {}
checkpoint = self.find_checkpoint(db)
if not checkpoint is None:
_id = ObjectId.from_datetime(checkpoint)
query['_id'] = {'$gte' : _id}
# BUG here should return only one value
assert db.bug.find({'_id': {'$gte' : _id}}).count() == 1
current_ts = int(round(time.time()*1000)) # timestamp in ms
return db.bug.map_reduce(_map, _reduce, SON([("reduce", "bug_stats"),]),
query=query, finalize=_finalize,
scope={"ts": current_ts},
full_response=True)
def test_bug(self):
# firts map reduce to populate a collection
result = self.map_reduce(self.m.test)
assert result['counts']['output'] == 2
# test incremental map reduce
time.sleep(5);
self.m.test.bug.insert({'data': 2, 'name': 'b'})
result = self.map_reduce(self.m.test)
# BUG here?, should only ouput one line
assert result['counts']['output'] == 2
assert result['counts']['input'] == 1
stats = list(self.m.test.bug_stats.find())
assert stats[1]['value']['data'] == 7.0
if __name__ == '__main__':
Test().test_bug()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment