Skip to content

Instantly share code, notes, and snippets.

@donaldpminer
Last active August 29, 2015 14:23
Show Gist options
  • Save donaldpminer/2e013191a5b366ed4df8 to your computer and use it in GitHub Desktop.
Save donaldpminer/2e013191a5b366ed4df8 to your computer and use it in GitHub Desktop.
# This demonstration of the Structured to Hierarchical pattern merges
# two csv files using a concatenation of the first and last name
# as the foreign key into a list of json documents.
# $ cat data/structured*csv
# shook,adam,88
# miner,don,34
# oreilly,tim,99
# shook,adam,54
# miner,don,34
# miner,don,hadoop,hive
# miner,don,hadoop,pig
# shook,adam,hadoop,pig
# miner,william,trucks,dump truck
# miner,don,python,scikit-learn
# $ cat data/structured*csv | python structured_to_hierarchical.py
# {"username": "adam shook", "skills": {"hadoop": ["pig"]}, "scores": ["54", "88"]}
# {"username": "don miner", "skills": {"python": ["scikit-learn"], "hadoop": ["hive", "pig"]}, "scores": ["34", "34"]}
# {"username": "tim oreilly", "scores": ["99"]}
# {"username": "william miner", "skills": {"trucks": ["dump truck"]}}
from mrjob.job import MRJob
import json
class job(MRJob):
def mapper(self, _, line):
# The mapper parses the line first
# It'll emit the *foreign key* as the key so that
# they get grouped together properly
# The value will be some sort of fact we want to record
row = line.split(',')
# Exploit the difference in columns to determine the
# file source. In MapReduce we'd use MultipleInputs.
if len(row) == 3:
# The line is from structured.csv
# Construct the username as the key
# Emit the score as the value
yield row[1] + ' ' + row[0], ('score', row[2])
elif len(row) == 4:
# the line is from structured2.csv
# Construct the username as the key
# Emit the category and skill as the value
yield row[1] + ' ' + row[0], ('skill', row[2], row[3])
def reducer(self, user, facts):
# Construct a hierarchical json from the facts
# Each mini-document is rooted at the foreign key
out = {}
out['username'] = user
for fact in facts:
if fact[0] == 'score':
if not out.has_key('scores'):
out['scores'] = []
out['scores'].append(fact[1])
elif fact[0] == 'skill':
if not out.has_key('skills'):
out['skills'] = {}
if not out['skills'].has_key(fact[1]):
out['skills'][fact[1]] = []
out['skills'][fact[1]].append(fact[2])
yield '', json.dumps(out)
if __name__ == '__main__':
j = job()
runner = j.make_runner()
runner.run()
for line in runner.stream_output():
print eval(line.split('\t')[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment