Skip to content

Instantly share code, notes, and snippets.

@arikfr
Created December 7, 2009 17:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arikfr/250932 to your computer and use it in GitHub Desktop.
Save arikfr/250932 to your computer and use it in GitHub Desktop.
Mapper base class using deferred tasks. Based on an example code by Nick Johnson (http://code.google.com/appengine/articles/deferred.html)
from google.appengine.ext import deferred
from google.appengine.runtime import DeadlineExceededError
class Mapper(object):
# Subclasses should replace this with a model class (eg, model.Person).
KIND = None
# Subclasses can replace this with the property the entities should be ordered by.
ORDER_BY = '__key__'
# Subclasses can replace this with a list of (property, value) tuples to filter by.
FILTERS = []
def __init__(self):
self.to_put = []
self.to_delete = []
def map(self, entity):
"""Updates a single entity.
Implementers should return a tuple containing two iterables (to_update, to_delete).
"""
return ([], [])
def finish(self):
"""Called when the mapper has finished, to allow for any final work to be done."""
pass
def get_query(self):
"""Returns a query over the specified kind, with any appropriate filters applied."""
q = self.KIND.all()
for prop, value in self.FILTERS:
q.filter("%s =" % prop, value)
q.order(self.ORDER_BY)
return q
def run(self, batch_size=100):
"""Starts the mapper running."""
self._continue(None, batch_size)
def _batch_write(self):
"""Writes updates and deletes entities in a batch."""
if self.to_put:
db.put(self.to_put)
self.to_put = []
if self.to_delete:
db.delete(self.to_delete)
self.to_delete = []
def _continue(self, start_key, batch_size):
q = self.get_query()
# If we're resuming, pick up where we left off last time.
if start_key:
q.filter("%s >" % self.ORDER_BY, start_key)
# Keep updating records until we run out of time.
continue_key = None
try:
# Steps over the results, returning each entity and its index.
res = q.fetch(batch_size)
for i, entity in enumerate(res):
map_updates, map_deletes = self.map(entity)
self.to_put.extend(map_updates)
self.to_delete.extend(map_deletes)
if self.ORDER_BY == '__key__':
continue_key = entity.key()
else:
continue_key = getattr(entity, self.ORDER_BY)
self._batch_write()
except DeadlineExceededError:
# Write any unfinished updates to the datastore.
self._batch_write()
# Queue a new task to pick up where we left off.
deferred.defer(self._continue, continue_key, batch_size)
return
if continue_key is not None:
deferred.defer(self._continue, continue_key, batch_size)
else:
self.finish()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment