Skip to content

Instantly share code, notes, and snippets.

@SpainTrain
Created September 18, 2020 05:51
Show Gist options
  • Save SpainTrain/1460db52bc4cf4126eb2f6b8158d59ff to your computer and use it in GitHub Desktop.
Save SpainTrain/1460db52bc4cf4126eb2f6b8158d59ff to your computer and use it in GitHub Desktop.
Approach to migrations in Google Cloud Datastore
# From https://cloud.google.com/appengine/articles/deferred
import logging
from google.appengine.ext import deferred, ndb
from google.appengine.runtime import DeadlineExceededError
class Mapper(object):
# Subclasses should replace this with a model class (eg, model.Person).
KIND = None
# Subclasses can replace this with a list of (property, value) tuples to filter by.
FILTERS = []
def __init__(self):
self.to_put = []
self.to_delete = []
self.entities_processed_count = 0
def map(self, entity): # pylint: disable=unused-argument
'''Updates a single entity.
Implementers should return a tuple containing two iterables (to_update, to_delete).
*IMPORTANT*: When used for migrations, this function should be idempotent!
(https://en.wikipedia.org/wiki/Idempotence#Computer_science_meaning)
'''
return ([], [])
def finish(self):
'''Called when the mapper has finished, to allow for any final work to be done.'''
pass
def get_query(self):
'''Returns a query over the specified kind, with any appropriate filters applied.'''
q = self.KIND.query()
for prop, value in self.FILTERS:
q = q.filter(prop == value)
q = q.order(getattr(self.KIND, '_key'))
return q
def run(self, batch_size=100):
'''Starts the mapper running.'''
self._continue(None, batch_size)
def _batch_write(self):
'''Writes updates and deletes entities in a batch.'''
if self.to_put:
ndb.put_multi(self.to_put)
logging.info('[%s] Updated %d entities', self.__class__.__name__, len(self.to_put))
self.to_put = []
if self.to_delete:
ndb.delete_multi(self.to_delete)
logging.info('[%s] Deleted %d entities', self.__class__.__name__, len(self.to_delete))
self.to_delete = []
def _continue(self, start_key, batch_size):
q = self.get_query()
# If we're resuming, pick up where we left off last time.
if start_key:
key_prop = getattr(self.KIND, '_key')
q = q.filter(key_prop > start_key)
# Keep updating records until we run out of time.
try:
# Steps over the results, returning each entity and its index.
for i, entity in enumerate(q):
map_updates, map_deletes = self.map(entity)
self.to_put.extend(map_updates)
self.to_delete.extend(map_deletes)
# Do updates and deletes in batches.
if (i + 1) % batch_size == 0:
self._batch_write()
# Record the last entity we processed.
start_key = entity.key
self.entities_processed_count += 1
self._batch_write()
except DeadlineExceededError:
# Write any unfinished updates to the datastore.
self._batch_write()
# Queue a new task to pick up where we left off.
deferred.defer(self._continue, start_key, batch_size)
return
logging.info(
'[%s] Finished processing %d %s entities',
self.__class__.__name__,
self.entities_processed_count,
self.KIND.__name__,
)
self.finish()
class SomeModelMigration(Mapper):
KIND = SomeModel
def map(self, data):
if data.some_field is 'magic string':
data.some_field = 'different magic string'
return ([data], [])
return ([], [])
def run_some_migration():
mapper = SomeModelMigration()
deferred.defer(mapper.run)
@SpainTrain
Copy link
Author

Note that the map function is pure and contains only business logic, so unit testing is straightforward.

Idempotence is critical, so unit tests should include idempotence tests. All the idempotence tests need to check is that the update and delete buffers are both empty. This allows rerunning the entire migration in case it is interrupted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment