Skip to content

Instantly share code, notes, and snippets.

@rishav-rohit
Created March 3, 2014 12:55
Show Gist options
  • Save rishav-rohit/9324374 to your computer and use it in GitHub Desktop.
Save rishav-rohit/9324374 to your computer and use it in GitHub Desktop.
package com.rishav.avro.mapreduce;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.rishav.avro.IntPair;
import com.rishav.avro.student_marks;
public class AvroAverageDriver extends Configured implements Tool{
public static class AvroAverageMapper extends
Mapper<AvroKey<student_marks>, NullWritable, IntWritable, IntPair> {
protected void map(AvroKey<student_marks> key, NullWritable value, Context context)
throws IOException, InterruptedException {
IntWritable s_id = new IntWritable(key.datum().getStudentId());
IntPair marks_one = new IntPair(key.datum().getMarks(), 1);
context.write(s_id, marks_one);
}
} // end of mapper class
public static class AvroAverageCombiner extends
Reducer<IntWritable, IntPair, IntWritable, IntPair> {
IntPair p_sum_count = new IntPair();
Integer p_sum = new Integer(0);
Integer p_count = new Integer(0);
protected void reduce(IntWritable key, Iterable<IntPair> values, Context context)
throws IOException, InterruptedException {
p_sum = 0;
p_count = 0;
for (IntPair value : values) {
p_sum += value.getFirstInt();
p_count += value.getSecondInt();
}
p_sum_count.set(p_sum, p_count);
context.write(key, p_sum_count);
}
} // end of combiner class
public static class AvroAverageReducer extends
Reducer<IntWritable, IntPair, AvroKey<Integer>, AvroValue<Float>> {
Integer f_sum = 0;
Integer f_count = 0;
protected void reduce(IntWritable key, Iterable<IntPair> values, Context context)
throws IOException, InterruptedException {
f_sum = 0;
f_count = 0;
for (IntPair value : values) {
f_sum += value.getFirstInt();
f_count += value.getSecondInt();
}
Float average = (float)f_sum/f_count;
Integer s_id = new Integer(key.toString());
context.write(new AvroKey<Integer>(s_id), new AvroValue<Float>(average));
}
} // end of reducer class
@Override
public int run(String[] rawArgs) throws Exception {
if (rawArgs.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job(super.getConf());
job.setJarByClass(AvroAverageDriver.class);
job.setJobName("Avro Average");
String[] args = new GenericOptionsParser(rawArgs).getRemainingArgs();
Path inPath = new Path(args[0]);
Path outPath = new Path(args[1]);
FileInputFormat.setInputPaths(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(super.getConf()).delete(outPath, true);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setMapperClass(AvroAverageMapper.class);
AvroJob.setInputKeySchema(job, student_marks.getClassSchema());
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntPair.class);
job.setCombinerClass(AvroAverageCombiner.class);
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
job.setReducerClass(AvroAverageReducer.class);
AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new AvroAverageDriver(), args);
System.exit(result);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment