Skip to content

Instantly share code, notes, and snippets.

@geofferyzh
Created October 5, 2012 13:16
Show Gist options
  • Save geofferyzh/3839748 to your computer and use it in GitHub Desktop.
Save geofferyzh/3839748 to your computer and use it in GitHub Desktop.
Hadoop 101 - Chaining Multiple Jobs
// Make sure you set output of job1 as input to job 2
public class ChainJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
//conf.addResource("CosineSim.xml");
// Job 1
Job job1 = new Job(conf, "Step 1: Calculate app popularity");
job1.setJarByClass(CosineDriver.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
job1.setMapperClass(CosineMapper1.class);
job1.setReducerClass(CosineReducer1.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1] + "/output1"));
job1.waitForCompletion(true);
// Job 2
Job job2 = new Job(conf, "Step 2: Emit pair of apps");
job2.setJarByClass(CosineDriver.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setMapperClass(CosineMapper2.class);
job2.setReducerClass(CosineReducer2.class);
FileInputFormat.addInputPath(job2, new Path(args[1] + "/output1"));
FileOutputFormat.setOutputPath(job2, new Path(args[1] + "/output2"));
job2.waitForCompletion(true);
// Job 3
Job job3 = new Job(conf, "Step 3: Calculate Vector Similarity");
job3.setJarByClass(CosineDriver.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(IntWritable.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(DoubleWritable.class);
job3.setMapperClass(CosineMapper3.class);
job3.setReducerClass(CosineReducer3.class);
FileInputFormat.addInputPath(job3, new Path(args[1] + "/output2"));
FileOutputFormat.setOutputPath(job3, new Path(args[1] + "/output3"));
job3.waitForCompletion(true);
return 0;
}
//----------------------------------------------------------------------
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new CosineDriver(), args);
System.exit(exitCode);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment