Last active
August 21, 2021 08:11
-
-
Save MattFaus/6786120 to your computer and use it in GitHub Desktop.
A custom appengine-mapreduce input_reader and output_writer. The ContentRevisionsInputReader iterates over a complex data structure we use to keep track of modifications to each of the content nodes within the Khan Academy content management system. This is a utility class that is used to intelligently rollout schema changes to these nodes. The …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ContentRevisionsInputReader( | |
third_party.mapreduce.input_readers.InputReader): | |
"""Mapper that loads the latest commit's revisions from the datastore.""" | |
def __init__(self, kinds, start, step, current=0): | |
# List of content kinds to filter on | |
self._kinds = kinds | |
# Start index in snapshot array | |
self._start = start | |
# Stride through the snapshot array | |
self._step = step | |
# Index in the resulting filtered list we are currently traversing | |
self._current = current | |
def __iter__(self): | |
commit = content.models.ContentCommit.get_editing_head() | |
# Given a start and a step (stride), get the set of revision keys this | |
# shard is responsible for | |
keys = commit.get_revision_keys()[self._start::self._step] | |
# Filter out revision keys that are not in our kinds filter | |
# (This filters out a variable number, so different shards will | |
# process a slightly different number of revisions) | |
keys = filter(lambda k: k.kind().split("Revision")[0] in self._kinds, | |
keys) | |
# Get the revisions from the keys | |
revisions = compat_key.maybe_get_multi(keys) | |
# Skip the first _current items (If we are interrupted, our | |
# current position is saved in _current) | |
for revision in revisions[self._current:]: | |
self._current += 1 | |
yield revision | |
@classmethod | |
def split_input(cls, mapper_spec): | |
"""Divide mapping work evenly among a set of input readers. | |
This function is called once to create a set of | |
ContentRevisionsInputReader instances that will each traverse its own | |
distinct set of revisions. Each reader starts at a different offset | |
and steps forward by N, where N is the number of readers (shards). | |
""" | |
params = third_party.mapreduce.input_readers._get_params(mapper_spec) | |
kinds = params["kinds"].split(",") | |
shard_count = mapper_spec.shard_count | |
mr_input_readers = [cls(kinds, idx, shard_count) | |
for idx in xrange(0, shard_count)] | |
return mr_input_readers | |
@classmethod | |
def validate(cls, mapper_spec): | |
pass | |
@classmethod | |
def from_json(cls, json): | |
return cls(json["kinds"], json["start"], json["step"], json["current"]) | |
def to_json(self): | |
return {"kinds": self._kinds, "start": self._start, "step": self._step, | |
"current": self._current} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CustomMultiFileOutputWriter(third_party.mapreduce.output_writers.FileOutputWriter): | |
"""Creates multiple files | |
""" | |
@classmethod | |
def _get_all_filenames(cls, mapreduce_state_or_ctx): | |
"""Generates filenames for this mapreduce job. | |
The generator pattern is used for fun, it could just return an array. | |
Arguments: | |
mapreduce_state_or_ctx - may either be a model.MapreduceState | |
or a context.Context object. Thankfully, the members we | |
are interested in have the same name in both cases. | |
""" | |
mr_name = mapreduce_state_or_ctx.mapreduce_spec.name | |
mr_id = mapreduce_state_or_ctx.mapreduce_spec.mapreduce_id | |
file_suffixes = ['-even', '-odd'] | |
for suffix in file_suffixes: | |
yield ("%s-%s%s" % (mr_name, mr_id, suffix)) | |
@classmethod | |
def init_job(cls, mapreduce_state): | |
"""Initialize job-level writer state. | |
We must create all files that we will be writing to at the beginning, | |
otherwise we would risk multiple instances of CustomMultiFileOutputWriter | |
running on separate shards concurrently creating the same files. | |
At least, I think that's right. | |
You could have each shard generate it's own set of files, and then | |
stitch them together in finalize_job(), but that might get hairy. | |
Note: Most of this code is copied from FileOutputWriterBase.init_job() | |
Args: | |
mapreduce_state: an instance of model.MapreduceState describing current | |
job. | |
""" | |
logging.error('CustomMultiFileOutputWriter.init_job() called %s' % mapreduce_state) | |
try: | |
# output_sharding = cls._get_output_sharding(mapreduce_state=mapreduce_state) | |
mapper_spec = mapreduce_state.mapreduce_spec.mapper | |
params = third_party.mapreduce.output_writers._get_params(mapper_spec) | |
mime_type = params.get("mime_type", "application/octet-stream") | |
filesystem = cls._get_filesystem(mapper_spec=mapper_spec) | |
# bucket = params.get(cls.GS_BUCKET_NAME_PARAM) | |
acl = params.get(cls.GS_ACL_PARAM, "project-private") | |
filenames = [] | |
request_filenames = [] | |
for filename in cls._get_all_filenames(mapreduce_state): | |
# if bucket is not None: | |
# filename = "%s/%s" % (bucket, filename) | |
request_filenames.append(filename) | |
system_filename = cls._create_file( | |
filesystem, filename, mime_type, acl=acl) | |
logging.error('Created file %s as %s', filename, system_filename) | |
filenames.append(system_filename) | |
mapreduce_state.writer_state = cls._State( | |
filenames, request_filenames).to_json() | |
# Note: FileOutputWriterBase.get_filenames() returns these filenames | |
# so if you do anything different, you may have to override that function | |
except Exception, e: | |
logging.error('CustomMultiFileOutputWriter.init_job() unhandled exception %s' % e) | |
def write(self, data, ctx): | |
"""Write data. | |
Args: | |
data: actual data yielded from handler. Type is writer-specific. | |
ctx: an instance of context.Context. | |
""" | |
try: | |
logging.error('custom writer.write invoked with %s' % data) | |
if ctx.get_pool("file_pool") is None: | |
ctx.register_pool("file_pool", | |
third_party.mapreduce.output_writers._FilePool(ctx=ctx)) | |
all_files = self._filenames | |
# all_files = self._get_all_filenames(ctx) # Or, this | |
# Figure out which file this data belongs in | |
num = int(data[:-1]) | |
# if num % 2 == 0: | |
# my_file = all_files[0] | |
# else: | |
# my_file = all_files[1] | |
my_file = all_files[int(data[:-1]) % 2] # ha! | |
logging.error('Writing %s to %s' % (data, my_file)) | |
ctx.get_pool("file_pool").append(my_file, str(data)) | |
except Exception, e: | |
logging.error('write unhandled exception: %s' % e) | |
@classmethod | |
def from_json(cls, state): | |
logging.error('from_json received state %s', state) | |
return cls(state["filenames"]) | |
def to_json(self): | |
return { "filenames": self._filenames } | |
def __init__(self, filenames): | |
self._filenames = filenames | |
@classmethod | |
def create(cls, mapreduce_state, shard_number): | |
"""Create new writer for a shard. | |
Args: | |
mapreduce_state: an instance of model.MapreduceState describing current | |
job. | |
shard_number: shard number as integer. | |
""" | |
state = cls._State.from_json(mapreduce_state.writer_state) | |
return cls(state.filenames) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment