Skip to content
Create a gist now

Instantly share code, notes, and snippets.

Periodically-updating pymongo/MongoDB incremental MapReduce example
def incremental_map_reduce(
counter_table_name = "IncrementalMRCounters",
counter_key = None,
max_datetime = None,
reset = False,
force = False):
""" This method performs an incremental map-reduce on any new data in 'source_table_name'
into 'target_table_name'. It can be run in a cron job, for instance, and on each execution will
process only the new, unprocessed records.
The set of data to be processed incrementally is determined non-invasively (meaning the source table is not
written to) by using the queued_date field 'source_queued_date_field_name'. When a record is ready to be processed,
simply set its queued_date (which should be indexed for efficiency). When incremental_map_reduce() is run, any documents
with queued_dates between the counter in 'counter_key' and 'max_datetime' will be map/reduced.
If reset is True, it will clear 'target_table_name' and do a map reduce across all records older
than max_datetime.
If unspecified/None, counter_key defaults to counter_table_name:LastMaxDatetime.
now =
if max_datetime is None:
max_datetime = now
if reset:
logging.debug("Resetting, dropping table " + target_table_name)
time_limits = { "$lt" : max_datetime }
if counter_key is None:
counter_key = target_table_name + ":LastMaxDatetime"
# If we've run before, filter out anything that we've processed already.
last_max_datetime = None
last_max_datetime_record = db[counter_table_name].find_and_modify(
{'_id': counter_key},
{'$set': { 'inprogress': True}, '$push': { 'm': now } },
upsert = True
if force or last_max_datetime_record is None or not last_max_datetime_record.has_key('inprogress'):
# first time ever run, or forced to go ahead anyway
if last_max_datetime_record['inprogress']:
if last_max_datetime_record['m'][0] < now - datetime.timedelta(hours = 2):
# lock timed out, so go ahead...
logging.error(target_table_name + " lock is old. Ignoring it, but something was broken that caused it to not be unlocked...")
logging.warning(target_table_name + " mapreduce already in progress, skipping...")
raise RuntimeError(target_table_name + " locked since %s. Skipping..." % last_max_datetime_record['m'][0])
if not reset:
if last_max_datetime_record is not None:
last_max_datetime = last_max_datetime_record['value']
time_limits['$gt'] = last_max_datetime
logging.debug('~FR limit last_max_datetime = %s' % (last_max_datetime,))
except KeyError:
# This happened on staging. i guess it crashed somehow
# between the find_and_modify and the final update?
logging.error("~FR no value on message!")
query = { source_queued_date_field_name: time_limits }
ret = db[source_table_name].map_reduce(
out = { 'reduce' : target_table_name },
query = query,
full_response = True
num_processed = ret['counts']['input']
# Update our counter so we remember for the next pass.
already_processed_through = db[counter_table_name].update(
{'_id': counter_key},
{'$set': { 'inprogress': False, 'value': max_datetime }, '$unset': {'m': 1}},
upsert = False,
multi = False,
safe = True)
logging.debug("Processed %d completed surveys from %s through %s.\nmap_reduce details: %s" % (num_processed, last_max_datetime, max_datetime, ret))
return ret

Nice code. This looks generic except for the query referencing a 'cpl' field in the docs of the source table that contains the timestamp of the source documents. Or am I misunderstanding?

ses4j commented Jul 3, 2012

@jhasselkus: Thanks for the comment, you interpreted it right and I missed that. Replaced harcoded "cpl" with passed-in "source_queued_date_field_name". Added documentation to explain.


Thanks it helps a lot.
I had to change the 29th line to: now = datetime.datetime.utcnow() in order to be able process tweets by taking "created_at" field into account.

bersany commented Jan 10, 2014

Thanks! I rewrote a customized version of it for node.js and mongoose and it works flawlessly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.