Skip to content

Instantly share code, notes, and snippets.

@geofferyzh
Created August 27, 2012 14:55
Show Gist options
  • Save geofferyzh/3489223 to your computer and use it in GitHub Desktop.
Save geofferyzh/3489223 to your computer and use it in GitHub Desktop.
PYMK Stage2 C
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class pymk2 extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.printf(
"Usage: %s [generic options] <input dir> <output dir>\n", getClass()
.getSimpleName());
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
JobConf conf = new JobConf(getConf(), pymk2.class);
conf.setJobName(this.getClass().getName());
KeyValueTextInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(pymk2_mapper.class);
conf.setReducerClass(pymk2_reducer.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
// conf.setNumReduceTasks(0);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new pymk2(), args);
System.exit(exitCode);
}
}
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class pymk2_mapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
String[] s = value.toString().split("\t");
String[] vs = s[1].split(",");
List<String> vlist = Arrays.asList(vs);
for (int i=0; i < vlist.size(); i++) {
output.collect(new Text(s[0] + "," + vlist.get(i)), new Text("1stD"));
for (int j= i+1; j < vlist.size(); j++) {
output.collect(new Text(vlist.get(i) + "," + vlist.get(j)), new Text(s[0]));
output.collect(new Text(vlist.get(j) + "," + vlist.get(i)), new Text(s[0]));
}
}
}
}
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class pymk2_reducer extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
String commonfr = "";
int count = 0;
// while loop to append a list 2nd degree connected friends
while (values.hasNext()) {
Text value = values.next();
count = count +1;
// deal with first reco in the list
if (commonfr == "") {
commonfr = value.toString();
}
else {
if (commonfr.lastIndexOf(value.toString()) <0 ) {
commonfr = commonfr + "," + value.toString();
}
}
}
// do not emit if pair of users are 1st degree connected
if (commonfr.indexOf("1stD") < 0) {
output.collect(new Text(key + ":" + Integer.toString(count)), new Text(Integer.toString(count) + ":" + commonfr));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment