Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
A custom appengine-mapreduce input_reader and output_writer. The ContentRevisionsInputReader iterates over a complex data structure we use to keep track of modifications to each of the content nodes within the Khan Academy content management system. This is a utility class that is used to intelligently rollout schema changes to these nodes. The …
class ContentRevisionsInputReader(
"""Mapper that loads the latest commit's revisions from the datastore."""
def __init__(self, kinds, start, step, current=0):
# List of content kinds to filter on
self._kinds = kinds
# Start index in snapshot array
self._start = start
# Stride through the snapshot array
self._step = step
# Index in the resulting filtered list we are currently traversing
self._current = current
def __iter__(self):
commit = content.models.ContentCommit.get_editing_head()
# Given a start and a step (stride), get the set of revision keys this
# shard is responsible for
keys = commit.get_revision_keys()[self._start::self._step]
# Filter out revision keys that are not in our kinds filter
# (This filters out a variable number, so different shards will
# process a slightly different number of revisions)
keys = filter(lambda k: k.kind().split("Revision")[0] in self._kinds,
# Get the revisions from the keys
revisions = compat_key.maybe_get_multi(keys)
# Skip the first _current items (If we are interrupted, our
# current position is saved in _current)
for revision in revisions[self._current:]:
self._current += 1
yield revision
def split_input(cls, mapper_spec):
"""Divide mapping work evenly among a set of input readers.
This function is called once to create a set of
ContentRevisionsInputReader instances that will each traverse its own
distinct set of revisions. Each reader starts at a different offset
and steps forward by N, where N is the number of readers (shards).
params = third_party.mapreduce.input_readers._get_params(mapper_spec)
kinds = params["kinds"].split(",")
shard_count = mapper_spec.shard_count
mr_input_readers = [cls(kinds, idx, shard_count)
for idx in xrange(0, shard_count)]
return mr_input_readers
def validate(cls, mapper_spec):
def from_json(cls, json):
return cls(json["kinds"], json["start"], json["step"], json["current"])
def to_json(self):
return {"kinds": self._kinds, "start": self._start, "step": self._step,
"current": self._current}
class CustomMultiFileOutputWriter(third_party.mapreduce.output_writers.FileOutputWriter):
"""Creates multiple files
def _get_all_filenames(cls, mapreduce_state_or_ctx):
"""Generates filenames for this mapreduce job.
The generator pattern is used for fun, it could just return an array.
mapreduce_state_or_ctx - may either be a model.MapreduceState
or a context.Context object. Thankfully, the members we
are interested in have the same name in both cases.
mr_name =
mr_id = mapreduce_state_or_ctx.mapreduce_spec.mapreduce_id
file_suffixes = ['-even', '-odd']
for suffix in file_suffixes:
yield ("%s-%s%s" % (mr_name, mr_id, suffix))
def init_job(cls, mapreduce_state):
"""Initialize job-level writer state.
We must create all files that we will be writing to at the beginning,
otherwise we would risk multiple instances of CustomMultiFileOutputWriter
running on separate shards concurrently creating the same files.
At least, I think that's right.
You could have each shard generate it's own set of files, and then
stitch them together in finalize_job(), but that might get hairy.
Note: Most of this code is copied from FileOutputWriterBase.init_job()
mapreduce_state: an instance of model.MapreduceState describing current
logging.error('CustomMultiFileOutputWriter.init_job() called %s' % mapreduce_state)
# output_sharding = cls._get_output_sharding(mapreduce_state=mapreduce_state)
mapper_spec = mapreduce_state.mapreduce_spec.mapper
params = third_party.mapreduce.output_writers._get_params(mapper_spec)
mime_type = params.get("mime_type", "application/octet-stream")
filesystem = cls._get_filesystem(mapper_spec=mapper_spec)
# bucket = params.get(cls.GS_BUCKET_NAME_PARAM)
acl = params.get(cls.GS_ACL_PARAM, "project-private")
filenames = []
request_filenames = []
for filename in cls._get_all_filenames(mapreduce_state):
# if bucket is not None:
# filename = "%s/%s" % (bucket, filename)
system_filename = cls._create_file(
filesystem, filename, mime_type, acl=acl)
logging.error('Created file %s as %s', filename, system_filename)
mapreduce_state.writer_state = cls._State(
filenames, request_filenames).to_json()
# Note: FileOutputWriterBase.get_filenames() returns these filenames
# so if you do anything different, you may have to override that function
except Exception, e:
logging.error('CustomMultiFileOutputWriter.init_job() unhandled exception %s' % e)
def write(self, data, ctx):
"""Write data.
data: actual data yielded from handler. Type is writer-specific.
ctx: an instance of context.Context.
logging.error('custom writer.write invoked with %s' % data)
if ctx.get_pool("file_pool") is None:
all_files = self._filenames
# all_files = self._get_all_filenames(ctx) # Or, this
# Figure out which file this data belongs in
num = int(data[:-1])
# if num % 2 == 0:
# my_file = all_files[0]
# else:
# my_file = all_files[1]
my_file = all_files[int(data[:-1]) % 2] # ha!
logging.error('Writing %s to %s' % (data, my_file))
ctx.get_pool("file_pool").append(my_file, str(data))
except Exception, e:
logging.error('write unhandled exception: %s' % e)
def from_json(cls, state):
logging.error('from_json received state %s', state)
return cls(state["filenames"])
def to_json(self):
return { "filenames": self._filenames }
def __init__(self, filenames):
self._filenames = filenames
def create(cls, mapreduce_state, shard_number):
"""Create new writer for a shard.
mapreduce_state: an instance of model.MapreduceState describing current
shard_number: shard number as integer.
state = cls._State.from_json(mapreduce_state.writer_state)
return cls(state.filenames)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment