Last active
December 11, 2015 01:59
-
-
Save iceboal/4527447 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Map Class | |
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { | |
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { | |
21 | |
} | |
} | |
//Reduce Class | |
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { | |
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { | |
int sum = 0; | |
while (values.hasNext()) { | |
sum += values.next().get(); | |
} | |
output.collect(key, new IntWritable(sum)); | |
} | |
} | |
//Main | |
public static void main(String[] args) throws Exception { | |
JobConf conf = new JobConf(Behavior.class); | |
conf.setJobName("Behavior"); | |
conf.setOutputKeyClass(Text.class); | |
conf.setOutputValueClass(IntWritable.class); | |
conf.setMapperClass(Map.class); | |
conf.setCombinerClass(Reduce.class); | |
conf.setReducerClass(Reduce.class); | |
conf.setInputFormat(TextInputFormat.class); | |
conf.setOutputFormat(TextOutputFormat.class); | |
conf.setNumReduceTasks(600); | |
FileInputFormat.setInputPaths(conf, new Path(args[0])); | |
FileOutputFormat.setOutputPath(conf, new Path(args[1])); | |
JobClient.runJob(conf); | |
} | |
//is Numeric | |
public static boolean isNumeric(String str){ | |
for (int i = str.length();--i>=0;){ | |
if (!Character.isDigit(str.charAt(i))){ | |
return false; | |
} | |
} | |
return true; | |
} | |
//run | |
@Override | |
public int run(String[] args) throws Exception { | |
Configuration jobConf =getConf(); | |
JobConf conf = new JobConf(jobConf, Combine.class); | |
conf.setJobName("Combine"); | |
conf.setOutputKeyClass(Text.class); | |
conf.setOutputValueClass(Text.class); | |
conf.setMapperClass(IdentityMapper.class); | |
conf.setReducerClass(Reduce.class); | |
conf.setInputFormat(KeyValueTextInputFormat.class); | |
conf.setOutputFormat(TextOutputFormat.class); | |
conf.setNumReduceTasks(600); | |
FileInputFormat.setInputPaths(conf, new Path(args[0])); | |
FileOutputFormat.setOutputPath(conf, new Path(args[1])); | |
JobClient.runJob(conf); | |
return 0; | |
} | |
public static void main(String[] args) throws Exception { | |
int res = ToolRunner.run(new Configuration(), new Combine(), args); | |
System.exit(res); | |
} | |
//Partition | |
public static class FirstPartitioner implements Partitioner<TextPair, Text> { | |
@Override | |
public void configure(JobConf job) {} | |
@Override | |
public int getPartition(TextPair key, Text value, int numPartitions) { | |
return (key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions; | |
} | |
} | |
public static class KeyComparator extends WritableComparator { | |
protected KeyComparator() { | |
super(TextPair.class, true); | |
} | |
@SuppressWarnings("rawtypes") | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
TextPair tp1 = (TextPair) w1; | |
TextPair tp2 = (TextPair) w2; | |
int cmp = tp1.compareTo(tp2); | |
return cmp; | |
} | |
} | |
public static class GroupComparator extends WritableComparator { | |
protected GroupComparator() { | |
super(TextPair.class, true); | |
} | |
@SuppressWarnings("rawtypes") | |
@Override | |
public int compare(WritableComparable w1, WritableComparable w2) { | |
TextPair tp1 = (TextPair) w1; | |
TextPair tp2 = (TextPair) w2; | |
int cmp = tp1.getFirst().compareTo(tp2.getFirst()); | |
return cmp; | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
conf.setPartitionerClass(FirstPartitioner.class); | |
conf.setOutputKeyComparatorClass(KeyComparator.class); | |
conf.setOutputValueGroupingComparator(GroupComparator.class); | |
} | |
//iterate | |
public static void main(String[] args) throws Exception { | |
RunningJob job = getPageRankJob("/mblog_content/ds=20111"+I+"31/*", "TagDay/tag"+n+"/"); | |
job.waitForCompletion(); | |
} | |
public static RunningJob getPageRankJob(String inPath,String outPath) throws Exception | |
{ | |
JobConf conf = new JobConf(Tag.class); | |
conf.setJobName("Tag"); | |
conf.setOutputKeyClass(Text.class); | |
conf.setOutputValueClass(IntWritable.class); | |
conf.setMapperClass(Map.class); | |
conf.setReducerClass(Reduce.class); | |
conf.setInputFormat(TextInputFormat.class); | |
conf.setOutputFormat(TextOutputFormat.class); | |
conf.setNumReduceTasks(600); | |
FileInputFormat.setInputPaths(conf, new Path(inPath)); | |
FileOutputFormat.setOutputPath(conf, new Path(outPath)); | |
FileSystem.get(conf).delete(new Path(outPath), true);//如果文件已存在删除 | |
return JobClient.runJob(conf); | |
} | |
//MultipleTextOutputFormat | |
public static class OutputMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, Text> { | |
private TextOutputFormat<Text, Text> output = null; | |
@Override | |
protected RecordWriter<Text, Text> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { | |
if (output == null) { | |
output = new TextOutputFormat<Text, Text>(); | |
} | |
return output.getRecordWriter(fs, job, name, arg3); | |
} | |
@Override | |
protected String generateFileNameForKeyValue(Text key, Text value, String name) { | |
String filename = name; | |
if (value.toString().substring(0, 1).equals("~")) { | |
filename = "output/"+name;; | |
} | |
return filename; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment