Skip to content

Instantly share code, notes, and snippets.

@vaidik
Last active December 20, 2015 15:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vaidik/078f8ac5e751427c69b1 to your computer and use it in GitHub Desktop.
Save vaidik/078f8ac5e751427c69b1 to your computer and use it in GitHub Desktop.
Introduction to Hadoop Map Reduce Framework
#!/bin/bash
streaming=/usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.2.0.1.3.0.0-107.jar
hadoop dfs -rmr data/result
hadoop \
jar $streaming \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=2 \
-D map.output.key.field.separator='|' \
-D stream.map.output.field.separator='|' \
-D stream.num.map.output.key.fields=2 \
-D mapred.text.key.partitioner.options="-k1,1" \
-D mapred.text.key.comparator.options="-k1,1 -k2n,2" \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input data/movies/item \
-input data/movies/rating \
-output data/result \
-mapper "python my_mapper.py" \
-reducer "python my_reducer.py" \
-file my_mapper.py \
-file my_reducer.py
import os
import sys
filepath = os.environ["map_input_file"]
for line in sys.stdin:
if 'data/movies/rating' in filepath:
tokens = line.strip().split("\t")
print "%s|1\t1" % (tokens[1])
else:
tokens = line.strip().split('|')
print "%s|0\t%s" % (tokens[0], tokens[1])
import sys
import os
last_movie = None
last_name = None
last_frequency = 0
def output():
if last_movie and last_name:
frequency = str(last_frequency)
print "\t".join([last_movie, last_name, frequency])
for line in sys.stdin:
key, value = line.strip().split("\t")
id, rtype = key.strip().split("|")
rtype = int(rtype)
if id != last_movie:
output()
last_movie = id
last_name = None
last_frequency = 0
if rtype == 0: last_name = value.strip()
else:
last_frequency += 1
output()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment