/**********************************
*DriverMapSideJoinLargeDatasets
*Driver
**********************************/
 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.join.CompositeInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class DriverMapSideJoinLargeDatasets {
 
  public static void main(String[] args) throws Exception {
  	JobConf conf = new JobConf("DriverMapSideJoinLargeDatasets");
		conf.setJarByClass(DriverMapSideJoinLargeDatasets.class);
		String[] jobArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
 
		Path dirEmployeesData = new Path(jobArgs[0]);
		Path dirSalaryData = new Path(jobArgs[1]);
		Path dirOutput = new Path(jobArgs[2]);
 
		conf.setMapperClass(MapperMapSideJoinLargeDatasets.class);
 
		conf.setInputFormat(CompositeInputFormat.class);
		String strJoinStmt = CompositeInputFormat.compose("inner",
				KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData);
 
		conf.set("mapred.join.expr", strJoinStmt);
 
		conf.setNumReduceTasks(0);
 
		conf.setOutputFormat(TextOutputFormat.class);
		TextOutputFormat.setOutputPath(conf, dirOutput);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(Text.class);
 
		RunningJob job = JobClient.runJob(conf);
		while (!job.isComplete()) {
			Thread.sleep(1000);
		}
 
		System.exit(job.isSuccessful() ? 0 : 2);
	}
}