Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Experimental code demonstrating arbitrary mappers and reducers in the mapreduce library
import collections
import jinja2
import logging
import os
import request_handler
import third_party.mapreduce
import third_party.mapreduce.input_readers
import third_party.mapreduce.output_writers
import third_party.mapreduce.lib.files
import third_party.mapreduce.operation
from google.appengine.ext import db
import compat_key
import content.models
import content.publish
import layer_cache
import setting_model
import user_models
import user_util
RAW_DATA = [
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, #19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400, 401, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 457, 458, 459, 460, 461, 462, 463, 464, 465, 466, 467, 468, 469, 470, 471, 472, 473, 474, 475, 476, 477, 478, 479, 480, 481, 482, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493, 494, 495, 496, 497, 498, 499, 500, 501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545, 546, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558, 559, 560, 561, 562, 563, 564, 565, 566, 567, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 579, 580, 581, 582, 583, 584, 585, 586, 587, 588, 589, 590, 591, 592, 593, 594, 595, 596, 597, 598, 599, 600, 601, 602, 603, 604, 605, 606, 607, 608, 609, 610, 611, 612, 613, 614, 615, 616, 617, 618, 619, 620, 621, 622, 623, 624, 625, 626, 627, 628, 629, 630, 631, 632, 633, 634, 635, 636, 637, 638, 639, 640, 641, 642, 643, 644, 645, 646, 647, 648, 649, 650, 651, 652, 653, 654, 655, 656, 657, 658, 659, 660, 661, 662, 663, 664, 665, 666, 667, 668, 669, 670, 671, 672, 673, 674, 675, 676, 677, 678, 679, 680, 681, 682, 683, 684, 685, 686, 687, 688, 689, 690, 691, 692, 693, 694, 695, 696, 697, 698, 699, 700, 701, 702, 703, 704, 705, 706, 707, 708, 709, 710, 711, 712, 713, 714, 715, 716, 717, 718, 719, 720, 721, 722, 723, 724, 725, 726, 727, 728, 729, 730, 731, 732, 733, 734, 735, 736, 737, 738, 739, 740, 741, 742, 743, 744, 745, 746, 747, 748, 749, 750, 751, 752, 753, 754, 755, 756, 757, 758, 759, 760, 761, 762, 763, 764, 765, 766, 767, 768, 769, 770, 771, 772, 773, 774, 775, 776, 777, 778, 779, 780, 781, 782, 783, 784, 785, 786, 787, 788, 789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 799, 800, 801, 802, 803, 804, 805, 806, 807, 808, 809, 810, 811, 812, 813, 814, 815, 816, 817, 818, 819, 820, 821, 822, 823, 824, 825, 826, 827, 828, 829, 830, 831, 832, 833, 834, 835, 836, 837, 838, 839, 840, 841, 842, 843, 844, 845, 846, 847, 848, 849, 850, 851, 852, 853, 854, 855, 856, 857, 858, 859, 860, 861, 862, 863, 864, 865, 866, 867, 868, 869, 870, 871, 872, 873, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883, 884, 885, 886, 887, 888, 889, 890, 891, 892, 893, 894, 895, 896, 897, 898, 899, 900, 901, 902, 903, 904, 905, 906, 907, 908, 909, 910, 911, 912, 913, 914, 915, 916, 917, 918, 919, 920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935, 936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946, 947, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961, 962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973, 974, 975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987, 988, 989, 990, 991, 992, 993, 994, 995, 996, 997, 998, 999,
]
# TODO(mattfaus): Capture this in a wiki page on https://code.google.com/p/appengine-mapreduce/w/list
# Below is a description of how RAW_DATA passes through the mapreduce library and
# winds up in a blob file.
# 1. StartJobHandler._start_map() calls validate() is called with all of MapperSpec
# on both the input_reader and the output_writer
# MapperSpec(advanced_mapreduce.data_processor, advanced_mapreduce.CustomMapper, {'input_reader': {'foo': 'bar'}, 'processing_rate': 128, 'output_writer': {'output_sharding': 'none', 'filesystem': 'blobstore'}}, 32)
# 1a. StartJobHandler._start_map() verifies that the handler_spec returns a proper handler
# 1b. StartJobHandler._start_map() creates a new model.MapreduceState entity
# 1c. "" adds a call to kickoffjob_callback to the queue_name
# 2. KickOffJobHandler.handle() calls InputReader.split_input()
# Note: It's OK if there are no InputReaders returned by split_input()
# split_input got MapperSpec(advanced_mapreduce.data_processor, advanced_mapreduce.CustomMapper, {u'input_reader': {u'foo': u'bar'}, u'processing_rate': 128, u'output_writer': {u'output_sharding': u'none', u'filesystem': u'blobstore'}}, 32)
# split_input got params {'foo': u'bar'}
# Which, in turn, calls InputReader.__init__() for each of the input shards
# 2a. "" calls OutputWriter.init_job()
# 2b. "" for each InputReader returned by split_input(), call:
# OutputWriter.create(), add this to the list of OutputWriter's
# NOTE: The number of InputReaders returned by split_input() determines the number of shards
# 3. to_json() is called on all of the CustomMapper instances that split_input returned,
# presumably to distribute across the cluster
# 4. from_json() is called on all the shards, presumably to resume that shard's work
# 4b. CustomMapper.__init__ is called (from within from_json())
# 5. __iter__ is called on all of the shards, not all of them have to do work
# Starting processing of 0 items from 1
# Starting processing of 0 items from 0
# 6. data_processor (the handler) is invoked with the output of __iter__
# Handler invoked with data 17
# 6a. InputReader.__iter__ generates the work items
# 6b. MapperWorkerCallbackHandler.process_data() streams:
# - work items from InputReader.__iter__ into handler_spec
# - results from handler_spec into OutputWriter.write()
# 7. As the shards complete their work, the POST to /mapreduce/worker_callback
# Which the mapreduce library handles in third_party.mapreduce.handlers.MapperWorkerCallbackHandler
# server.py:593] default: "POST /mapreduce/worker_callback HTTP/1.1" 200 -
# 8. When all shards have completed, there is a post to POST /mapreduce/controller_callback
# Which the mapreduce library handles in third_party.mapreduce.handlers.ControllerCallbackHandler
# INFO 2013-07-25 17:31:20,680 server.py:593] default: "POST /mapreduce/worker_callback HTTP/1.1" 200 -
# INFO 2013-07-25 17:31:21,062 handlers.py:449] Final result for job '1580509570902BA1D5DAA' is 'success'
# 8a. CAlls ControllerCallbackHandler._finalize_job(), which calls output_writer_class().finalize_job(mapreduce_state)
# 8b. FileOutputWriterBase.finalize_job(), closes all the files, and returns the blob_keys
# 9. I modified the mapreduce status UI to output the final blob key
# 10. Download that blob, and viola, you've got RAW_DATA' !
# 0'1'2'3'4'5'6'7'8'9'10'11'12'13'14'15'16'17'18'
############
# Or, using the CustomMultiFileOutputWriter:
# 8. When all shards have completed, there is a post to POST /mapreduce/controller_callback
# Which the mapreduce library handles in third_party.mapreduce.handlers.ControllerCallbackHandler
# 8a. CAlls CustomMultiFileOutputWriter._finalize_job(),
# 8b. CustomMultiFileOutputWriter.finalize_job(), closes all the files, and returns the blob_keys
# 9. I modified the mapreduce status UI to output the final blob keys
# 10. You have two files which contain, respectively:
# 0'2'4'6'8'10'12'14'16'18'
# 1'3'5'7'9'11'13'15'17'
############
# And, for parallelized reduce:
# 6b. CustomParallelReduce:
# Coallate data written into smaller chunks
# At job finalization, launch another MapReduce, which
# Splits intermediate data
# Finally, writes into blob files with CustomMultiFileOutputWriter
class StartAdvancedMapreduce(request_handler.RequestHandler):
@user_util.manual_access_checking # Shhhh
def get(self):
# Kick off the mapreduce
mapreduce_id = third_party.mapreduce.control.start_map(
name="AdvancedMapreduce",
# Responsible for splitting input based on input_reader params
reader_spec=(
"advanced_mapreduce.CustomMapper"),
# Responsible for processing input from InputReader, outputing data to OutputWriter
handler_spec='advanced_mapreduce.data_processor',
# Responsible for writing output
# Single-file writing
# output_writer_spec=(
# 'third_party.mapreduce.output_writers.FileOutputWriter'),
# Multi-file writing (custom)
output_writer_spec=(
'advanced_mapreduce.CustomMultiFileOutputWriter'),
mapper_parameters={
"input_reader": {
"foo": 'bar'
},
# Required by FileOutputWriterBase
"output_writer": {
# "output_sharding": "input", # Only needed for FileOutputWriter,
# default is 'none', which is what we want usually
"filesystem": 'blobstore',
# OR
# "filesystem": 'gs',
# "gs_bucket_name": 'bobthebucket',
# "gs_acl": "project-private", # Optional, project-private is default
},
"processing_rate": 128,
},
mapreduce_parameters={
# "done_callback": "/devadmin/content/backfill_done"
},
shard_count=32,
queue_name="backfill-mapreduce-queue")
logging.error('Starting mapreduce job %s' % mapreduce_id)
class CustomMapper(third_party.mapreduce.input_readers.InputReader):
def __init__(self, start, step, current=0):
self._start = start
self._step = step
# Index in the resulting filtered list we are currently traversing
self._current = current
logging.error('CustomMapper init with %i, %i, %i' % (start, step, current))
def __iter__(self):
my_data = RAW_DATA[self._start::self._step]
logging.error(
'Starting processing of %i items from %i' % (self._current, len(my_data)))
for data in my_data[self._current:]:
self._current += 1
# TODO(mattfaus): What if we are interrupted right here?
yield data
@classmethod
def split_input(cls, mapper_spec):
"""Divide mapping work evenly among a set of input readers.
This function is called once to create a set of
ContentRevisionsInputReader instances that will each traverse its own
distinct set of revisions. Each reader starts at a different offset
and steps forward by N, where N is the number of readers (shards).
"""
params = third_party.mapreduce.input_readers._get_params(mapper_spec)
logging.error('split_input got %s' % mapper_spec)
logging.error('split_input got params %s' % params)
shard_count = mapper_spec.shard_count
mr_input_readers = [cls(idx, shard_count)
for idx in xrange(0, shard_count)]
return mr_input_readers
@classmethod
def validate(cls, mapper_spec):
logging.error('validate() got %s' % mapper_spec)
@classmethod
def from_json(cls, json):
logging.error('deserializing')
return cls(json["start"], json["step"], json["current"])
def to_json(self):
logging.error('serializing')
return {"start": self._start, "step": self._step,
"current": self._current}
# TODO(mattfaus): Play with input_reader.expand_parameters in process_data()
class CustomMultiFileOutputWriter(third_party.mapreduce.output_writers.FileOutputWriter):
"""Creates multiple files
"""
@classmethod
def _get_all_filenames(cls, mapreduce_state_or_ctx):
"""Generates filenames for this mapreduce job.
The generator pattern is used for fun, it could just return an array.
Arguments:
mapreduce_state_or_ctx - may either be a model.MapreduceState
or a context.Context object. Thankfully, the members we
are interested in have the same name in both cases.
"""
mr_name = mapreduce_state_or_ctx.mapreduce_spec.name
mr_id = mapreduce_state_or_ctx.mapreduce_spec.mapreduce_id
file_suffixes = ['-even', '-odd']
for suffix in file_suffixes:
yield ("%s-%s%s" % (mr_name, mr_id, suffix))
@classmethod
def init_job(cls, mapreduce_state):
"""Initialize job-level writer state.
We must create all files that we will be writing to at the beginning,
otherwise we would risk multiple instances of CustomMultiFileOutputWriter
running on separate shards concurrently creating the same files.
At least, I think that's right.
You could have each shard generate it's own set of files, and then
stitch them together in finalize_job(), but that might get hairy.
Note: Most of this code is copied from FileOutputWriterBase.init_job()
Args:
mapreduce_state: an instance of model.MapreduceState describing current
job.
"""
logging.error('CustomMultiFileOutputWriter.init_job() called %s' % mapreduce_state)
try:
# output_sharding = cls._get_output_sharding(mapreduce_state=mapreduce_state)
mapper_spec = mapreduce_state.mapreduce_spec.mapper
params = third_party.mapreduce.output_writers._get_params(mapper_spec)
mime_type = params.get("mime_type", "application/octet-stream")
filesystem = cls._get_filesystem(mapper_spec=mapper_spec)
# bucket = params.get(cls.GS_BUCKET_NAME_PARAM)
acl = params.get(cls.GS_ACL_PARAM, "project-private")
filenames = []
request_filenames = []
for filename in cls._get_all_filenames(mapreduce_state):
# if bucket is not None:
# filename = "%s/%s" % (bucket, filename)
request_filenames.append(filename)
system_filename = cls._create_file(
filesystem, filename, mime_type, acl=acl)
logging.error('Created file %s as %s', filename, system_filename)
filenames.append(system_filename)
mapreduce_state.writer_state = cls._State(
filenames, request_filenames).to_json()
# Note: FileOutputWriterBase.get_filenames() returns these filenames
# so if you do anything different, you may have to override that function
except Exception, e:
logging.error('CustomMultiFileOutputWriter.init_job() unhandled exception %s' % e)
def write(self, data, ctx):
"""Write data.
Args:
data: actual data yielded from handler. Type is writer-specific.
ctx: an instance of context.Context.
"""
try:
logging.error('custom writer.write invoked with %s' % data)
if ctx.get_pool("file_pool") is None:
ctx.register_pool("file_pool",
third_party.mapreduce.output_writers._FilePool(ctx=ctx))
all_files = self._filenames
# all_files = self._get_all_filenames(ctx) # Or, this
# Figure out which file this data belongs in
num = int(data[:-1])
# if num % 2 == 0:
# my_file = all_files[0]
# else:
# my_file = all_files[1]
my_file = all_files[int(data[:-1]) % 2] # ha!
logging.error('Writing %s to %s' % (data, my_file))
ctx.get_pool("file_pool").append(my_file, str(data))
except Exception, e:
logging.error('write unhandled exception: %s' % e)
@classmethod
def from_json(cls, state):
logging.error('from_json received state %s', state)
return cls(state["filenames"])
def to_json(self):
return { "filenames": self._filenames }
def __init__(self, filenames):
self._filenames = filenames
@classmethod
def create(cls, mapreduce_state, shard_number):
"""Create new writer for a shard.
Args:
mapreduce_state: an instance of model.MapreduceState describing current
job.
shard_number: shard number as integer.
"""
state = cls._State.from_json(mapreduce_state.writer_state)
return cls(state.filenames)
def data_processor(data):
"""This must be a generator for mapreduce to receive the output."""
logging.error('Handler invoked with data %s' % data)
yield '%i\'' % data
# TODO(mattfaus): Rename query_config to job_config
# TODO(mattfaus): Rename OutputWriter
# TODO(mattfaus): Rename Query* to Map*
class DataWriter(object):
"""Used by the QueryDrainerPipeline to coalesce intermediate results
into their final resting place.
"""
def write_metadata(self, metadata):
raise NotImplemented()
def write_result(self, data):
"""Writes all data."""
raise NotImplemented()
def finalize(self):
"""Returns the final result. Maybe a blob_key, maybe something else."""
raise NotImplemented()
class CheckpointedDataWriter(DataWriter):
class QueryPipeline(pipeline.Pipeline):
"""Extend this class to process small subsets of data."""
outputs = ['query_result']
def run(self, *args, **kwargs):
"""Runs the query, writes it to query_result."""
try:
result = self.query(args, kwargs)
self.fill(self.query_result, result)
except Exception, e:
self.handle_exception()
finally:
yield common.Return(self.pipeline_id)
def handle_exception(self):
"""Raises the first 2 exceptions, gracefully fails on the 3rd. Overridable."""
# TODO(mattfaus): Raise for retries
self.fill(self.query_result, {'errors_sheet': e})
def query(self, *args, **kwargs):
"""Returns a dict that our DataWriter understands."""
raise NotImplemented("Implement in sub-class.")
@property
def task_name():
"""Override in base classes for easier debugging."""
return 'unknown'
class CoordinatedQueryPipeline(pipeline.Pipeline):
def get_data_writer(self, query_config):
"""Given query_config, gets the DataWriter."""
raise NotImplemented()
def get_query_pipelines(self, query_config):
"""Given query_config, gets the QueryPipelines.
Returns:
An array of QueryPipelines
"""
# Parse query_config to instantiate QueryPipelines, return
raise NotImplemented()
def get_reduce_pipelines(self, map_results):
"""Given map_results, segment the reduce pipelines.
Arguments:
map_results - a list of pipeline_ids
"""
def run(self, query_config):
map_results = []
for q in self.get_query_pipelines():
map_results.append((yield self.get_query_pipelines()))
# map_results is a list of pipeline_id's that contain results
reduce_results =
num_query_results = yield QueryLauncherPipeline(query_config)
yield QueryDrainerPipeline(query_config, num_queued_query_results)
# Or, parallelize the reduction (with a recursive DataWriter)
# parallelization_factor = 10
# for p in range(parallelization_factor):
# yield QueryDrainerPipeline()
def run(self, query_config):
"""Starts the producer and consumer pipelines."""
num_query_results = yield QueryLauncherPipeline(query_config)
yield QueryDrainerPipeline(query_config, num_queued_query_results)
# Or, parallelize the reduction (with a recursive DataWriter)
# parallelization_factor = 10
# for p in range(parallelization_factor):
# yield QueryDrainerPipeline()
class QueryLauncherPipeline(CoordinatedQueryPipeline):
def run(self, query_config):
enqueue_results = []
for query_pipeline in self.get_query_pipelines(query_config):
enqueue_results.append((
yield EnqueuePipeline(query_pipeline, tag=self.root_pipeline_id,
task_name=query_pipeline.task_name)))
yield common.Sum(*enqueue_results)
class QueryDrainerPipeline(CoordinatedQueryPipeline):
def run(query_config, num_queued):
# Must be idempotent, so add checkpointing to DataWriter?
data_writer = self.get_data_writer(query_config, self.root_pipeline_id)
# TODO(mattfaus): How can we guarantee this takes <10min to run?
# Async pipeline, or launch a background thread?
dequeued_count = 0
while dequeued_count < num_queued:
# Lease task from pull-queue
# Task payload is simply a pipeline_id (thanks to QueryPipeline)
query_pipeline = QueryPipeline.from_id(task.payload)
data_writer.write_result(query_pipeline.query_result)
dequeued_count += 1
# mark as processed-by self.pipeline_id (for idempotency)
yield common.Return(data_writer.finalize())
# Delete all tasks
# Example 1 - A simple segmentation and query example
########################################################################
class ExampleQueryPipeline(QueryPipeline):
def query(self, example_id):
# TODO(mattfaus): Issue datastore queries, fill in dict, return
pass
class ExampleCoordinatedQueryPipeline(CoordinatedQueryPipeline):
def get_data_writer(self, query_config):
return CsvWriter() # or a DataStoreWriter(), perhaps?
def get_query_pipelines(self, query_config):
for example_id in query_config['ids']:
yield ExampleQueryPipeline(example_id)
def run_my_query(query_config):
stage = ExampleCoordinatedQueryPipeline(query_config)
stage.run()
# Output the result of DataWriter.finalize()
print stage.outputs.default.value
# Example 2 - A more complex segmentation and query example
########################################################################
class NewSriCoordinatedQueryPipeline(CoordinatedQueryPipeline):
def get_data_writer(self, query_config):
return CsvWriter()
def get_query_pipelines(self, query_config):
for coach_id in query_config['coach_ids']:
yield NewCoachPipeline(coach_id)
for student_id in get_students(coach_id):
date_segments = get_date_segments(student_id)
for start_dt, end_dt in date_segments:
yield NewStudentPipeline(student_id, start_dt, end_dt)
class NewCoachPipeline(QueryPipeline):
def query(self, coach_id):
# Query for all coach-related data, return as a dict
pass
class NewStudentPipeline(QueryPipeline):
def query(self, student_id, start_dt, end_dt):
# Query for all student-related data, return as a dict
pass
def generate_sri_data(query_config):
stage = NewSriCoordinatedQueryPipeline(query_config)
stage.run()
print stage.outputs.default.value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment