Skip to content

Instantly share code, notes, and snippets.

@emlynoregan
Last active August 29, 2015 14:10
Show Gist options
  • Save emlynoregan/9604b963901bfb63a752 to your computer and use it in GitHub Desktop.
Save emlynoregan/9604b963901bfb63a752 to your computer and use it in GitHub Desktop.
Datastore Mapper with Distributed Promises for AppEngine
# This is a generic map function for the datastore.
# Suitable for passing to the PromiseSpace.when() method.
# outerresolve: this function is called when the mapping is complete, with the full result.
# promiseSpace: a promise space, to be used for making promises.
# dsQuery: an ndb datastore query.
# mapF: an optional map function. Takes an object or key (depending on keyOnly) and returns something.
# reduceF: an optional reduce function. Takes a list of results of mapping and a previous return value
# from reduceF, and returns a new value
# keyOnly: determines whether the queries fetch objects or just keys
# pageSize: size of the pages fetched from the query
# returns:
# if no mapF, no reduceF:
# returns a list of items or keys fetched from the query. Be careful not to get too many!
# if has mapF but no reduceF:
# returns a list of results from mapF. Be careful not to get too many!
# if has no mapF, but has reduceF:
# returns last result returned by calling reduceF on lists of keys/items
# if has both mapF and reduceF:
# returns last result returned by calling reduceF on lists of mapF results
def MapDS(outerresolve, promiseSpace, dsQuery, mapF = None, reduceF = None, keyOnly = False, pageSize = 100):
def ProcessPage(resolve, cumulativeResult, cursor):
try:
if cursor:
litems, lnextCursor, lmore = dsQuery.fetch_page(pageSize, start_cursor = cursor, keys_only = keyOnly)
else:
litems, lnextCursor, lmore = dsQuery.fetch_page(pageSize, keys_only = keyOnly)
llocalResult = (mapF(litem) if mapF else litem for litem in litems)
if reduceF:
resolve((reduceF(llocalResult, cumulativeResult), lmore, lnextCursor))
else:
if cumulativeResult is None:
cumulativeResult = llocalResult
else:
cumulativeResult.extend(llocalResult)
resolve((cumulativeResult, lmore, lnextCursor))
except Exception, ex:
logging.exception("error in ProcessPage")
resolve(ex)
def NextPage(result):
try:
lcumulativeResult, lmore, lcursor = result.value
if not lmore:
outerresolve(lcumulativeResult)
else:
lpromise = promiseSpace.when(ProcessPage, lcumulativeResult, lcursor)
lpromise.then(NextPage)
except Exception, ex:
logging.exception("error in NextPage")
outerresolve(ex)
lpromise = promiseSpace.when(ProcessPage, None, None)
lpromise.then(NextPage)
# This is a generic counting function, which can count arbitrarily large data sets.
# Suitable for passing to the PromiseSpace.when() method.
# outerresolve: this function is called when the counting is complete, with the count.
# promiseSpace: a promise space, to be used for making promises.
# dsQuery: an ndb datastore query to count elements of
# progressF: if present, is called periodically, with the subtotal.
def CountDS(outerresolve, promiseSpace, dsQuery, progressF = None):
def CountMap(itemkey):
return 1
def CountReduce(counts, subtotal):
result = (subtotal if subtotal else 0) + sum(counts)
progressF(result)
return result
return MapDS(outerresolve, promiseSpace, dsQuery, CountMap, CountReduce, True)
# An example of how you might use CountDS
class CountResult(ndb.Model):
countname = ndb.StringProperty()
result = ndb.IntegerProperty()
def Count(self, dsQuery)
def UpdateSelf(result):
try:
self = self.key.get() # must reload self, will be old serialised version.
self.result = result.value
self.put()
except Exception, ex:
logging.exception("Counting failed")
promiseSpace = PromiseSpace(countname)
promiseSpace.when(CountDS, dsQuery).then(UpdateSelf)
CountResult("MyNdbObjectCount", MyNdbObject.query()).put()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment