Last active
August 29, 2015 14:23
-
-
Save donaldpminer/2e013191a5b366ed4df8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This demonstration of the Structured to Hierarchical pattern merges | |
# two csv files using a concatenation of the first and last name | |
# as the foreign key into a list of json documents. | |
# $ cat data/structured*csv | |
# shook,adam,88 | |
# miner,don,34 | |
# oreilly,tim,99 | |
# shook,adam,54 | |
# miner,don,34 | |
# miner,don,hadoop,hive | |
# miner,don,hadoop,pig | |
# shook,adam,hadoop,pig | |
# miner,william,trucks,dump truck | |
# miner,don,python,scikit-learn | |
# $ cat data/structured*csv | python structured_to_hierarchical.py | |
# {"username": "adam shook", "skills": {"hadoop": ["pig"]}, "scores": ["54", "88"]} | |
# {"username": "don miner", "skills": {"python": ["scikit-learn"], "hadoop": ["hive", "pig"]}, "scores": ["34", "34"]} | |
# {"username": "tim oreilly", "scores": ["99"]} | |
# {"username": "william miner", "skills": {"trucks": ["dump truck"]}} | |
from mrjob.job import MRJob | |
import json | |
class job(MRJob): | |
def mapper(self, _, line): | |
# The mapper parses the line first | |
# It'll emit the *foreign key* as the key so that | |
# they get grouped together properly | |
# The value will be some sort of fact we want to record | |
row = line.split(',') | |
# Exploit the difference in columns to determine the | |
# file source. In MapReduce we'd use MultipleInputs. | |
if len(row) == 3: | |
# The line is from structured.csv | |
# Construct the username as the key | |
# Emit the score as the value | |
yield row[1] + ' ' + row[0], ('score', row[2]) | |
elif len(row) == 4: | |
# the line is from structured2.csv | |
# Construct the username as the key | |
# Emit the category and skill as the value | |
yield row[1] + ' ' + row[0], ('skill', row[2], row[3]) | |
def reducer(self, user, facts): | |
# Construct a hierarchical json from the facts | |
# Each mini-document is rooted at the foreign key | |
out = {} | |
out['username'] = user | |
for fact in facts: | |
if fact[0] == 'score': | |
if not out.has_key('scores'): | |
out['scores'] = [] | |
out['scores'].append(fact[1]) | |
elif fact[0] == 'skill': | |
if not out.has_key('skills'): | |
out['skills'] = {} | |
if not out['skills'].has_key(fact[1]): | |
out['skills'][fact[1]] = [] | |
out['skills'][fact[1]].append(fact[2]) | |
yield '', json.dumps(out) | |
if __name__ == '__main__': | |
j = job() | |
runner = j.make_runner() | |
runner.run() | |
for line in runner.stream_output(): | |
print eval(line.split('\t')[1]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment