Skip to content

Instantly share code, notes, and snippets.

@grf
Created November 11, 2011 04:20
Show Gist options
  • Save grf/1357183 to your computer and use it in GitHub Desktop.
Save grf/1357183 to your computer and use it in GitHub Desktop.
Digram (word pair) counter for Cloud Computing and Storage Program 2
// G R Fischer for Fall 2011 Cloud Computing and Storage
// DiGram - Programming assignment 2, part 2 of 2
// 2011-11-10
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.myorg.TextPair;
public class DiGram {
// The map task breaks a text up into individual words, emitting the intermediate
// key, value pairs:
//
// <Text, Text>, 1
//
// Where <Text, Text> is of type TextPair, which implements WritableComparable.
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, TextPair, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private TextPair pair = new TextPair();
private Text[] buff = new Text[2];
public void map(LongWritable key, Text value, OutputCollector<TextPair, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
// Check for an empty text file:
if (tokenizer.hasMoreTokens()) {
buff[0] = new Text(tokenizer.nextToken().toLowerCase());
} else {
return;
}
// Use a sliding window over two Text values derived from the input file:
while (tokenizer.hasMoreTokens()) {
buff[1] = new Text(tokenizer.nextToken().toLowerCase());
pair.set(buff[0], buff[1]);
output.collect(pair, one);
buff[0] = buff[1];
}
}
}
// Reduce, given input as single TextPair with a list of occurences, e.g.
//
// <foo, bar>, [ 1, 1, 2 ]
//
// folds to summed occurrences
//
// <foo, bar>, 4
//
// Because we use this class as a combiner as well as a reducer, the reduction phase
// may get occurences > 1
public static class Reduce extends MapReduceBase implements Reducer<TextPair, IntWritable, TextPair, IntWritable> {
public void reduce(TextPair key, Iterator<IntWritable> values, OutputCollector<TextPair, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(DiGram.class);
conf.setJobName("digram");
// don't compress the output (deflate was default on EC2 hadoop instance I used)
conf.setBoolean("mapred.output.compress", false);
// hardcoded instances; no particular reason for these numbers
conf.setNumMapTasks(3);
conf.setNumReduceTasks(1);
// Setup intermediate key/value domain
conf.setOutputKeyClass(TextPair.class);
conf.setOutputValueClass(IntWritable.class);
// Suggest to runtime that the reducer can be used as a combiner
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
// input/output plain text - TextPair are cast to strings by the runtime
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
// input/output directories
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
Date start = new Date();
JobClient.runJob(conf);
Date now = new Date();
System.out.println(String.format("Digram took %d milliseconds to run", now.getTime() - start.getTime()));
}
}
package org.myorg;
import java.io.*;
import org.apache.hadoop.io.*;
// Create a pair of Text objects that we can emit from a map function.
// From example 4.7 in Hadoop, The Definitive Guide. O'Reilly 2010
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + " " + second;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment