package iv3; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class TopValues extends Configured implements Tool{ public static class TopValuesMapper extends Mapper<LongWritable,Text,Text,Text> { String column_delimiter; int column_to_rank; int rank_limit,rank_key; Boolean sort_ascending; @Override public void setup(Context context) { Configuration conf = context.getConfiguration(); column_delimiter = conf.get("column.delimiter"); column_to_rank = conf.getInt("column.to.rank", Integer.MIN_VALUE) - 1; rank_limit = conf.getInt("rank.limit", Integer.MIN_VALUE); rank_key = conf.getInt("rank.keys", Integer.MIN_VALUE)-1; sort_ascending = conf.getBoolean("sort.ascending", true); } @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String column[]=value.toString().split(column_delimiter); context.write(new Text(column[rank_key]),value); } } public static class TopValuesReducer extends Reducer<Text,Text,NullWritable,Text>{ String column_delimiter; int column_to_rank; int rank_limit,rank_key; Boolean sort_ascending; @Override public void setup(Context context) { Configuration conf = context.getConfiguration(); column_delimiter = conf.get("column.delimiter"); column_to_rank = conf.getInt("column.to.rank", Integer.MIN_VALUE) - 1; rank_limit = conf.getInt("rank.limit", Integer.MIN_VALUE); sort_ascending = conf.getBoolean("sort.ascending", true); rank_key=conf.getInt("rank.keys", Integer.MIN_VALUE)-1; } @Override protected void reduce(Text key, Iterable<Text> value,Context context)throws IOException, InterruptedException { StringBuilder builder=new StringBuilder(); TreeMap<Integer, String> sortval = new TreeMap<Integer, String>(); for(Text val:value) { String column[]=val.toString().split(column_delimiter); sortval.put(Integer.parseInt(column[column_to_rank]),val.toString()); if (sortval.size() > rank_limit) { if (sort_ascending) { sortval.remove(sortval.lastKey()); }else { sortval.remove(sortval.firstKey()); } } } Set set = sortval.entrySet(); Iterator i = set.iterator(); while(i.hasNext()) { Map.Entry me = (Map.Entry)i.next(); context.write(NullWritable.get(),new Text( me.getValue().toString())); } } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); String[] remaining_args = new GenericOptionsParser(conf, args).getRemainingArgs(); if (remaining_args.length == 7) { conf.set("column.delimiter", remaining_args[2]); conf.set("rank.keys", remaining_args[3]); conf.set("column.to.rank", remaining_args[4]); conf.set("rank.limit", remaining_args[5]); conf.setBoolean("sort.ascending", Boolean.parseBoolean(remaining_args[6])); } conf.set("hadoop.job.history.user.location", "none"); Job job = new Job(conf, getClass().getSimpleName()); job.setJarByClass(getClass()); FileInputFormat.addInputPath(job, new Path(remaining_args[0])); FileOutputFormat.setOutputPath(job, new Path(remaining_args[1])); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(TopValuesMapper.class); job.setReducerClass(TopValuesReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exit_code = ToolRunner.run(new TopValues(), args); System.exit(exit_code); } }