Skip to content

Instantly share code, notes, and snippets.

@dlwh
Created September 2, 2008 07:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dlwh/8389 to your computer and use it in GitHub Desktop.
Save dlwh/8389 to your computer and use it in GitHub Desktop.
private[hadoop] def runMapReduce[K1,V1,K2,V2,K3,V3](paths : Array[Path],
m: Mapper[K1,V1,K2,V2],
r: Reduce[K2,V2,K3,V3])
(implicit mk2:Manifest[K2], mv2:Manifest[V2],
mk3:Manifest[K3], mv3:Manifest[V3]) = {
implicit val jobConf = new JobConf(conf, m.getFunClass);
val outputPath = genDir;
jobConf.setJobName("SMR-"+outputPath.getName);
val mPath = serializeClass(jobConf,outputPath.getName+"-Map.ser",m);
val rPath = serializeClass(jobConf,outputPath.getName+"-Reduce.ser",r);
jobConf.set("smr.job.mapper.file",mPath.toString);
jobConf.set("smr.job.reducer.file",rPath.toString);
jobConf.setMapRunnerClass(classOf[ClosureMapper[_,_,_,_]]);
jobConf.setReducerClass(classOf[ReduceWrapper[_,_,_,_]]);
jobConf.setMapOutputKeyClass(Magic.classToWritableClass(mk2.erasure));
jobConf.setMapOutputValueClass(Magic.classToWritableClass(mv2.erasure));
jobConf.setOutputKeyClass(Magic.classToWritableClass(mk3.erasure));
jobConf.setOutputValueClass(Magic.classToWritableClass(mv3.erasure));
jobConf.setInputFormat(classOf[SequenceFileInputFormat[_,_]])
jobConf.setOutputFormat(classOf[SequenceFileOutputFormat[_,_]])
FileInputFormat.setInputPaths(jobConf, paths:_*);
FileOutputFormat.setOutputPath(jobConf,outputPath);
JobClient.runJob(jobConf);
outputPath.listFiles();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment