Skip to content

Instantly share code, notes, and snippets.

@allen501pc
Created May 3, 2011 14:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save allen501pc/953436 to your computer and use it in GitHub Desktop.
Save allen501pc/953436 to your computer and use it in GitHub Desktop.
SortDataPreprocessor
package SortData;
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
// vv SortDataPreprocessor
public class SortDataPreprocessor extends Configured implements Tool {
static class CleanerMapper extends MapReduceBase
implements Mapper<LongWritable, Text, IntWritable, Text> {
private NcdcRecordParser parser = new NcdcRecordParser();
public void map(LongWritable key, Text value,
OutputCollector<IntWritable, Text> output, Reporter reporter)
throws IOException {
parser.parse(value);
if (parser.isValidTemperature()) {
output.collect(new IntWritable(parser.getAirTemperature()), value);
}
}
}
@Override
public int run(String[] args) throws IOException {
JobConf conf = new JobConf(getConf(), getClass());
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
if (conf == null) {
return -1;
}
conf.setMapperClass(CleanerMapper.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(Text.class);
conf.setNumReduceTasks(0);
conf.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(conf, true);
SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(conf,
CompressionType.BLOCK);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SortDataPreprocessor(), args);
System.exit(exitCode);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment