Skip to content

Instantly share code, notes, and snippets.

@victorpoluceno
Created December 5, 2012 14:28
Incremental map reduce and ObjectId timestamp
import time
import datetime
import pymongo
from bson.objectid import ObjectId
from bson.code import Code
from bson.son import SON
DATA_SAMPLE = [
{'data': 1, 'name': 'a'},
{'data': 1, 'name': 'a'},
{'data': 1, 'name': 'a'},
{'data': 1, 'name': 'a'},
{'data': 1, 'name': 'a'},
{'data': 1, 'name': 'b'},
{'data': 1, 'name': 'b'},
{'data': 1, 'name': 'b'},
{'data': 1, 'name': 'b'},
{'data': 1, 'name': 'b'},
]
_map = Code("""function (){
emit(this.name, {'data': this.data});
};""")
_reduce = Code("""function (key, values){
var sum = 0;
values.forEach(function(doc){
sum += doc.data;
});
return {data: sum};
};""")
_finalize = Code("""function (key, reducedValue) {
reducedValue.ts = ts;
return reducedValue;
};""")
class Test:
def __init__(self):
self.m = pymongo.MongoClient(fsync=True)
self.m.test.drop_collection('bug')
self.m.test.drop_collection('bug_stats')
for l in DATA_SAMPLE:
self.m.test.bug.insert(l)
def find_checkpoint(self, db):
cursor = db.bug_stats.find(limit=1, sort=[('ts', pymongo.DESCENDING)])
if cursor.count() == 0:
return None
ts = cursor[0]['value']['ts'] / 1e3 # timestamp in seconds
return datetime.datetime.fromtimestamp(ts)
def map_reduce(self, db):
# BUG this fix the problem
time.sleep(1)
query = {}
checkpoint = self.find_checkpoint(db)
if not checkpoint is None:
_id = ObjectId.from_datetime(checkpoint)
query['_id'] = {'$gte' : _id}
# BUG here should return only one value
assert db.bug.find({'_id': {'$gte' : _id}}).count() == 1
current_ts = int(round(time.time()*1000)) # timestamp in ms
return db.bug.map_reduce(_map, _reduce, SON([("reduce", "bug_stats"),]),
query=query, finalize=_finalize,
scope={"ts": current_ts},
full_response=True)
def test_bug(self):
# firts map reduce to populate a collection
result = self.map_reduce(self.m.test)
assert result['counts']['output'] == 2
# test incremental map reduce
time.sleep(5);
self.m.test.bug.insert({'data': 2, 'name': 'b'})
result = self.map_reduce(self.m.test)
# BUG here?, should only ouput one line
assert result['counts']['output'] == 2
assert result['counts']['input'] == 1
stats = list(self.m.test.bug_stats.find())
assert stats[1]['value']['data'] == 7.0
if __name__ == '__main__':
Test().test_bug()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment