Skip to content

Instantly share code, notes, and snippets.

@geofferyzh
Created August 27, 2012 15:01
Show Gist options
  • Save geofferyzh/3489292 to your computer and use it in GitHub Desktop.
Save geofferyzh/3489292 to your computer and use it in GitHub Desktop.
PYMK Stage3
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
// Compares the composite key
public class CompositeKeyComparator extends WritableComparator {
/*s Constructor. */
protected CompositeKeyComparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
Text k1 = (Text)w1;
Text k2 = (Text)w2;
String[] k1Items = k1.toString().split(":");
String[] k2Items = k2.toString().split(":");
String k1Base = k1Items[0];
String k2Base = k2Items[0];
int comp = k1Base.compareTo(k2Base);
if(0 == comp) {
comp = -1 * k1Items[1].compareTo(k2Items[1]);
}
return comp;
}
}
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.*;
import java.lang.*;
// Partitions key based on "natural" key
public class myPartitioner<K2, V2> implements Partitioner<Text,Text> {
@Override
public void configure(JobConf job) {
}
public int getPartition(Text key, Text value, int numPartitions) {
String[] keyItems = key.toString().split(":");
String keyBase = keyItems[0];
int hash = keyBase.hashCode() & Integer.MAX_VALUE;
int mypartition = hash % numPartitions;
return mypartition;
}
}
import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
// Groups values based on the natural key
public class NaturalKeyGroupingComparator extends WritableComparator {
//Constructor.
protected NaturalKeyGroupingComparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
Text k1 = (Text)w1;
Text k2 = (Text)w2;
String[] k1Items = k1.toString().split(":");
String[] k2Items = k2.toString().split(":");
String k1Base = k1Items[0];
String k2Base = k2Items[0];
int comp = k1Base.compareTo(k2Base);
return comp;
}
}
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class pymk3 extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf(
"Usage: %s [generic options] <input dir> <output dir>\n", getClass()
.getSimpleName());
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
JobConf conf = new JobConf(getConf(), pymk3.class);
conf.setJobName(this.getClass().getName());
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(pymk3_mapper.class);
conf.setReducerClass(pymk3_reducer.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setOutputKeyComparatorClass(CompositeKeyComparator.class);
conf.setOutputValueGroupingComparator(NaturalKeyGroupingComparator.class);
conf.setPartitionerClass(myPartitioner.class);
// conf.setNumReduceTasks(1);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new pymk3(), args);
System.exit(exitCode);
}
}
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class pymk3_mapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
String[] str = value.toString().split("\t");
String[] keystr = str[0].split(":");
String[] pairstr = keystr[0].split(",");
output.collect(new Text(pairstr[0] + ":" + keystr[1]), new Text(pairstr[1] + ":" + str[1]));
}
}
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class pymk3_reducer extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
String[] keystr = key.toString().split(":");
int topcnt = 0;
while (values.hasNext() && topcnt <20) {
Text value = values.next();
String[] valuestr = value.toString().split(":");
output.collect(new Text(keystr[0] + "," + valuestr[0]), new Text(valuestr[1] + ":" + valuestr[2]));
topcnt += 1;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment