Skip to content

Instantly share code, notes, and snippets.

@allen501pc
Created May 3, 2011 13:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save allen501pc/953331 to your computer and use it in GitHub Desktop.
Save allen501pc/953331 to your computer and use it in GitHub Desktop.
MaxTemperatureWithCounters
package Counter;
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.*;
// vv MaxTemperatureWithCounters
public class MaxTemperatureWithCounters extends Configured implements Tool {
enum Temperature {
MISSING,
MALFORMED
}
static class MaxTemperatureMapperWithCounters extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private NcdcRecordParser parser = new NcdcRecordParser();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
parser.parse(value);
if (parser.isValidTemperature()) {
int airTemperature = parser.getAirTemperature();
output.collect(new Text(parser.getYear()),
new IntWritable(airTemperature));
} else if (parser.isMalformedTemperature()) {
System.err.println("Ignoring possibly corrupt input: " + value);
reporter.incrCounter(Temperature.MALFORMED, 1);
} else if (parser.isMissingTemperature()) {
reporter.incrCounter(Temperature.MISSING, 1);
}
// dynamic counter
reporter.incrCounter("TemperatureQuality", parser.getQuality(), 1);
}
}
@Override
public int run(String[] args) throws IOException {
//JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
JobConf conf = new JobConf(getConf(), getClass());
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
if (conf == null) {
return -1;
}
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MaxTemperatureMapperWithCounters.class);
conf.setCombinerClass(MaxTemperatureReducer.class);
conf.setReducerClass(MaxTemperatureReducer.class);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args);
System.exit(exitCode);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment