Skip to content

Instantly share code, notes, and snippets.

@iceboal
Last active December 11, 2015 01:59
Show Gist options
  • Save iceboal/4527447 to your computer and use it in GitHub Desktop.
Save iceboal/4527447 to your computer and use it in GitHub Desktop.
//Map Class
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
21
}
}
//Reduce Class
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
//Main
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(Behavior.class);
conf.setJobName("Behavior");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setNumReduceTasks(600);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
//is Numeric
public static boolean isNumeric(String str){
for (int i = str.length();--i>=0;){
if (!Character.isDigit(str.charAt(i))){
return false;
}
}
return true;
}
//run
@Override
public int run(String[] args) throws Exception {
Configuration jobConf =getConf();
JobConf conf = new JobConf(jobConf, Combine.class);
conf.setJobName("Combine");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setNumReduceTasks(600);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Combine(), args);
System.exit(res);
}
//Partition
public static class FirstPartitioner implements Partitioner<TextPair, Text> {
@Override
public void configure(JobConf job) {}
@Override
public int getPartition(TextPair key, Text value, int numPartitions) {
return (key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions;
}
}
public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(TextPair.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
TextPair tp1 = (TextPair) w1;
TextPair tp2 = (TextPair) w2;
int cmp = tp1.compareTo(tp2);
return cmp;
}
}
public static class GroupComparator extends WritableComparator {
protected GroupComparator() {
super(TextPair.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
TextPair tp1 = (TextPair) w1;
TextPair tp2 = (TextPair) w2;
int cmp = tp1.getFirst().compareTo(tp2.getFirst());
return cmp;
}
}
public static void main(String[] args) throws Exception {
conf.setPartitionerClass(FirstPartitioner.class);
conf.setOutputKeyComparatorClass(KeyComparator.class);
conf.setOutputValueGroupingComparator(GroupComparator.class);
}
//iterate
public static void main(String[] args) throws Exception {
RunningJob job = getPageRankJob("/mblog_content/ds=20111"+I+"31/*", "TagDay/tag"+n+"/");
job.waitForCompletion();
}
public static RunningJob getPageRankJob(String inPath,String outPath) throws Exception
{
JobConf conf = new JobConf(Tag.class);
conf.setJobName("Tag");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setNumReduceTasks(600);
FileInputFormat.setInputPaths(conf, new Path(inPath));
FileOutputFormat.setOutputPath(conf, new Path(outPath));
FileSystem.get(conf).delete(new Path(outPath), true);//如果文件已存在删除
return JobClient.runJob(conf);
}
//MultipleTextOutputFormat
public static class OutputMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, Text> {
private TextOutputFormat<Text, Text> output = null;
@Override
protected RecordWriter<Text, Text> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
if (output == null) {
output = new TextOutputFormat<Text, Text>();
}
return output.getRecordWriter(fs, job, name, arg3);
}
@Override
protected String generateFileNameForKeyValue(Text key, Text value, String name) {
String filename = name;
if (value.toString().substring(0, 1).equals("~")) {
filename = "output/"+name;;
}
return filename;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment