Created
December 7, 2009 17:07
-
-
Save arikfr/250932 to your computer and use it in GitHub Desktop.
Mapper base class using deferred tasks. Based on an example code by Nick Johnson (http://code.google.com/appengine/articles/deferred.html)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.appengine.ext import deferred | |
from google.appengine.runtime import DeadlineExceededError | |
class Mapper(object): | |
# Subclasses should replace this with a model class (eg, model.Person). | |
KIND = None | |
# Subclasses can replace this with the property the entities should be ordered by. | |
ORDER_BY = '__key__' | |
# Subclasses can replace this with a list of (property, value) tuples to filter by. | |
FILTERS = [] | |
def __init__(self): | |
self.to_put = [] | |
self.to_delete = [] | |
def map(self, entity): | |
"""Updates a single entity. | |
Implementers should return a tuple containing two iterables (to_update, to_delete). | |
""" | |
return ([], []) | |
def finish(self): | |
"""Called when the mapper has finished, to allow for any final work to be done.""" | |
pass | |
def get_query(self): | |
"""Returns a query over the specified kind, with any appropriate filters applied.""" | |
q = self.KIND.all() | |
for prop, value in self.FILTERS: | |
q.filter("%s =" % prop, value) | |
q.order(self.ORDER_BY) | |
return q | |
def run(self, batch_size=100): | |
"""Starts the mapper running.""" | |
self._continue(None, batch_size) | |
def _batch_write(self): | |
"""Writes updates and deletes entities in a batch.""" | |
if self.to_put: | |
db.put(self.to_put) | |
self.to_put = [] | |
if self.to_delete: | |
db.delete(self.to_delete) | |
self.to_delete = [] | |
def _continue(self, start_key, batch_size): | |
q = self.get_query() | |
# If we're resuming, pick up where we left off last time. | |
if start_key: | |
q.filter("%s >" % self.ORDER_BY, start_key) | |
# Keep updating records until we run out of time. | |
continue_key = None | |
try: | |
# Steps over the results, returning each entity and its index. | |
res = q.fetch(batch_size) | |
for i, entity in enumerate(res): | |
map_updates, map_deletes = self.map(entity) | |
self.to_put.extend(map_updates) | |
self.to_delete.extend(map_deletes) | |
if self.ORDER_BY == '__key__': | |
continue_key = entity.key() | |
else: | |
continue_key = getattr(entity, self.ORDER_BY) | |
self._batch_write() | |
except DeadlineExceededError: | |
# Write any unfinished updates to the datastore. | |
self._batch_write() | |
# Queue a new task to pick up where we left off. | |
deferred.defer(self._continue, continue_key, batch_size) | |
return | |
if continue_key is not None: | |
deferred.defer(self._continue, continue_key, batch_size) | |
else: | |
self.finish() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment