Last active
August 29, 2015 14:10
-
-
Save emlynoregan/9604b963901bfb63a752 to your computer and use it in GitHub Desktop.
Datastore Mapper with Distributed Promises for AppEngine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is a generic map function for the datastore. | |
# Suitable for passing to the PromiseSpace.when() method. | |
# outerresolve: this function is called when the mapping is complete, with the full result. | |
# promiseSpace: a promise space, to be used for making promises. | |
# dsQuery: an ndb datastore query. | |
# mapF: an optional map function. Takes an object or key (depending on keyOnly) and returns something. | |
# reduceF: an optional reduce function. Takes a list of results of mapping and a previous return value | |
# from reduceF, and returns a new value | |
# keyOnly: determines whether the queries fetch objects or just keys | |
# pageSize: size of the pages fetched from the query | |
# returns: | |
# if no mapF, no reduceF: | |
# returns a list of items or keys fetched from the query. Be careful not to get too many! | |
# if has mapF but no reduceF: | |
# returns a list of results from mapF. Be careful not to get too many! | |
# if has no mapF, but has reduceF: | |
# returns last result returned by calling reduceF on lists of keys/items | |
# if has both mapF and reduceF: | |
# returns last result returned by calling reduceF on lists of mapF results | |
def MapDS(outerresolve, promiseSpace, dsQuery, mapF = None, reduceF = None, keyOnly = False, pageSize = 100): | |
def ProcessPage(resolve, cumulativeResult, cursor): | |
try: | |
if cursor: | |
litems, lnextCursor, lmore = dsQuery.fetch_page(pageSize, start_cursor = cursor, keys_only = keyOnly) | |
else: | |
litems, lnextCursor, lmore = dsQuery.fetch_page(pageSize, keys_only = keyOnly) | |
llocalResult = (mapF(litem) if mapF else litem for litem in litems) | |
if reduceF: | |
resolve((reduceF(llocalResult, cumulativeResult), lmore, lnextCursor)) | |
else: | |
if cumulativeResult is None: | |
cumulativeResult = llocalResult | |
else: | |
cumulativeResult.extend(llocalResult) | |
resolve((cumulativeResult, lmore, lnextCursor)) | |
except Exception, ex: | |
logging.exception("error in ProcessPage") | |
resolve(ex) | |
def NextPage(result): | |
try: | |
lcumulativeResult, lmore, lcursor = result.value | |
if not lmore: | |
outerresolve(lcumulativeResult) | |
else: | |
lpromise = promiseSpace.when(ProcessPage, lcumulativeResult, lcursor) | |
lpromise.then(NextPage) | |
except Exception, ex: | |
logging.exception("error in NextPage") | |
outerresolve(ex) | |
lpromise = promiseSpace.when(ProcessPage, None, None) | |
lpromise.then(NextPage) | |
# This is a generic counting function, which can count arbitrarily large data sets. | |
# Suitable for passing to the PromiseSpace.when() method. | |
# outerresolve: this function is called when the counting is complete, with the count. | |
# promiseSpace: a promise space, to be used for making promises. | |
# dsQuery: an ndb datastore query to count elements of | |
# progressF: if present, is called periodically, with the subtotal. | |
def CountDS(outerresolve, promiseSpace, dsQuery, progressF = None): | |
def CountMap(itemkey): | |
return 1 | |
def CountReduce(counts, subtotal): | |
result = (subtotal if subtotal else 0) + sum(counts) | |
progressF(result) | |
return result | |
return MapDS(outerresolve, promiseSpace, dsQuery, CountMap, CountReduce, True) | |
# An example of how you might use CountDS | |
class CountResult(ndb.Model): | |
countname = ndb.StringProperty() | |
result = ndb.IntegerProperty() | |
def Count(self, dsQuery) | |
def UpdateSelf(result): | |
try: | |
self = self.key.get() # must reload self, will be old serialised version. | |
self.result = result.value | |
self.put() | |
except Exception, ex: | |
logging.exception("Counting failed") | |
promiseSpace = PromiseSpace(countname) | |
promiseSpace.when(CountDS, dsQuery).then(UpdateSelf) | |
CountResult("MyNdbObjectCount", MyNdbObject.query()).put() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment