Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
A quick experiment with the combiner_spec parameter in the appengine MapreducePipeline()
import random
import logging
def map(data):
try:
# Generate a random key from 1..10
key = random.randint(1, 10)
logging.info("%s %s", key, data)
yield (key, data)
except:
import traceback
logging.error(traceback.format_exc())
def combine(key, new_values, old_values):
try:
logging.error("%s %s %s", key, new_values, old_values)
old_values = set(old_values)
for value in new_values:
# Remove duplicates
if value not in old_values:
old_values.add(value)
yield value
else:
logging.info("Removing duplicate %s", value)
except:
import traceback
logging.error(traceback.format_exc())
def reduce(key, values):
try:
logging.info("%s %s", key, values)
# Emit an output line with <random_key>-<letter>: <count>
for v in values:
yield "%s - %s\n" % (key, v)
except:
import traceback
logging.error(traceback.format_exc())
class ContentAnalyticsLaunch(ContentAnalyticsQuery):
@user_util.manual_access_checking # superuser only via app.yaml (/admin)
def get(self):
try:
from third_party import mapreduce
import logging
pipeline = mapreduce.mapreduce_pipeline.MapreducePipeline(
"test_combiner",
"content_analytics.handlers.map",
"content_analytics.handlers.reduce",
# A debug input_reader provided by the SDK for testing purposes
"third_party.mapreduce.input_readers.RandomStringInputReader",
"third_party.mapreduce.output_writers.BlobstoreOutputWriter",
combiner_spec="content_analytics.handlers.combine",
mapper_params={
"string_length": 1,
"count": 500,
},
reducer_params={
"mime_type": "text/plain",
},
shards=16)
pipeline.start()
logging.error(pipeline.pipeline_id)
except:
import traceback
logging.error(traceback.format_exc())
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment