Skip to content

Instantly share code, notes, and snippets.

@muddydixon
Created August 29, 2012 02:07
Show Gist options
  • Save muddydixon/3506092 to your computer and use it in GitHub Desktop.
Save muddydixon/3506092 to your computer and use it in GitHub Desktop.
mongo-hadoop multipleInput example
public class MultipleDriver extends Configured implements Tool {
/** Logger */
public static final Log log =
LogFactory.getLog(HadongoDriver.class);
public static void main (String[] args) throws Exception {
log.info("*** start mapreduce ***");
int exitCode = ToolRunner.run(new MultipleDriver(), args);
log.info("**** end mapreduce ****");
System.exit(exitCode);
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = this.getConf();
MongoConfigUtil.setShardChunkSplittingEnabled(conf, true);
MongoConfigUtil.setReadSplitsFromShards(conf, false);
MongoConfigUtil.setReadSplitsFromSecondary(conf, true);
MongoConfigUtil.setOutputURI( conf, "mongodb://localhost:27017/multiple.myout");
Job job = new Job(conf, "multipleJob");
MongoMultipleInputs.addInputPath(job, "mongodb://localhost:27017/multiple.in1",
MongoInputFormat.class, MultipleMapper.class, "{name: {$lte: 10}}");
MongoMultipleInputs.addInputPath(job, "mongodb://localhost:27017/multiple.in2",
MongoInputFormat.class, MultipleMapper2.class, "{name: {$gt:10, $lte: 20}}");
MongoMultipleInputs.addInputPath(job, "mongodb://localhost:27017/multiple.in3",
MongoInputFormat.class, MultipleMapper3.class);
log.info(job.getConfiguration().get("mongo.input.request"));
job.setJarByClass(MultipleDriver.class);
job.setReducerClass( MultipleReducer.class );
job.setOutputFormatClass( MongoOutputFormat.class );
job.setMapOutputKeyClass( BSONWritable.class );
job.setMapOutputValueClass( BSONWritable.class );
job.setOutputKeyClass( BSONWritable.class );
job.setOutputValueClass( BSONWritable.class );
boolean success = false;
success = job.waitForCompletion(true);
return success ? 1 : 0;
}
}
public class CountMapper extends Mapper {
public void map( Object key , BSONObject value , Context context ) throws ClassCastException,
IOException, InterruptedException{
context.write(new BSONWritable((BasicDBObject) key.get("name"), 1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment