Skip to content

Instantly share code, notes, and snippets.

@grf
Created September 21, 2011 03:18
Show Gist options
  • Save grf/1231150 to your computer and use it in GitHub Desktop.
Save grf/1231150 to your computer and use it in GitHub Desktop.
p1b
// G R Fischer for Fall 2011 Cloud Computing and Storage
// DiGram - Programming assignment 1, part 2 of 3
// 2011-09-20
package net.sacred;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import net.sacred.TextPair;
public class DiGram {
// The map task breaks a text up into individual words, emitting the intermediate
// key, value pairs:
//
// <Text, Text>, 1
//
// Where <Text, Text> is of type TextPair, which implements WritableComparable.
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, TextPair, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private TextPair pair = new TextPair();
private Text[] buff = new Text[2];
public void map(LongWritable key, Text value, OutputCollector<TextPair, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
// Check for an empty text file:
if (tokenizer.hasMoreTokens()) {
buff[0] = new Text(tokenizer.nextToken().toLowerCase());
} else {
return;
}
// Use a sliding window over two Text values derived from the input file:
while (tokenizer.hasMoreTokens()) {
buff[1] = new Text(tokenizer.nextToken().toLowerCase());
pair.set(buff[0], buff[1]);
output.collect(pair, one);
buff[0] = buff[1];
}
}
}
// Reduce, given input as single TextPair with a list of occurences, e.g.
//
// <foo, bar>, [ 1, 1, 2 ]
//
// folds to summed occurrences
//
// <foo, bar>, 4
//
// Because we use this class as a combiner as well as a reducer, the reduction phase
// may get occurences > 1
public static class Reduce extends MapReduceBase implements Reducer<TextPair, IntWritable, TextPair, IntWritable> {
public void reduce(TextPair key, Iterator<IntWritable> values, OutputCollector<TextPair, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(DiGram.class);
conf.setJobName("digram");
// don't compress the output (deflate was default on EC2 hadoop instance I used)
conf.setBoolean("mapred.output.compress", false);
// hardcoded instances; no particular reason for these numbers
conf.setNumMapTasks(3);
conf.setNumReduceTasks(1);
// Setup intermediate key/value domain
conf.setOutputKeyClass(TextPair.class);
conf.setOutputValueClass(IntWritable.class);
// Suggest to runtime that the reducer can be used as a combiner
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
// input/output plain text - TextPair are cast to strings by the runtime
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
// input/output directories
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment