Created
September 9, 2012 23:44
-
-
Save cloverrose/3687970 to your computer and use it in GitHub Desktop.
MapReduce
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.myorg; | |
import java.io.IOException; | |
import java.util.*; | |
import java.lang.Math; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.conf.*; | |
import org.apache.hadoop.io.*; | |
import org.apache.hadoop.mapred.*; | |
import org.apache.hadoop.util.*; | |
public class AutoDj { | |
private final static int THRESHOLD = 10; | |
public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text> { | |
private Text word_key = new Text(); | |
private Text word_value = new Text(); | |
public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { | |
// <Line, UserId,TrackId,ListenDatetime> | |
// <UserId, [(TrackId, ListendDatetime)]> | |
String line = value.toString(); | |
String splits[] = line.split(","); | |
word_key.set(splits[0]); | |
word_value.set(splits[1] + "," + splits[2]); | |
output.collect(word_key, word_value); | |
} | |
} | |
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, IntWritable> { | |
private Text word_key = new Text(); | |
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { | |
// <UserId, [(TrackId, ListenedDatetime)]> | |
// <(TrackId, TrackId), Count> | |
List<List<String>> temp = new ArrayList<List<String>>(); | |
while (values.hasNext()) { | |
String value = values.next().toString(); | |
String splits[] = value.split(","); | |
String trackId = splits[0]; | |
String datetime = splits[1]; // timestampと仮定 | |
List<String> pair = new ArrayList<String>(); | |
pair.add(trackId); | |
pair.add(datetime); | |
temp.add(pair); | |
} | |
for(List<String> pair : temp) { | |
String trackId = pair.get(0); | |
String datetimeS = pair.get(1); // timestampと仮定 | |
int datetime = Integer.parseInt(datetimeS); | |
HashMap<String, Integer> H = new HashMap<String, Integer>(); | |
for(List<String> pair2 : temp){ | |
String trackId2 = pair2.get(0); | |
String datetimeS2 = pair2.get(1); // timestampと仮定 | |
int datetime2 = Integer.parseInt(datetimeS2); | |
if(Math.abs(datetime - datetime2) < THRESHOLD){ | |
if(!H.containsKey(trackId2)){ | |
H.put(trackId2, 1); | |
}else{ | |
H.put(trackId2, H.get(trackId2) + 1); | |
} | |
} | |
} | |
for(java.util.Map.Entry<String, Integer> e : H.entrySet()){ | |
String newKey = trackId + ":" + e.getKey(); | |
word_key.set(newKey); | |
output.collect(word_key, new IntWritable(e.getValue())); | |
} | |
} | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
JobConf conf = new JobConf(AutoDj.class); | |
conf.setJobName("autodj"); | |
conf.setOutputKeyClass(Text.class); | |
conf.setOutputValueClass(IntWritable.class); | |
conf.setMapperClass(Map.class); | |
conf.setCombinerClass(Reduce.class); | |
conf.setReducerClass(Reduce.class); | |
conf.setInputFormat(KeyValueTextInputFormat.class); | |
conf.setOutputFormat(TextOutputFormat.class); | |
FileInputFormat.setInputPaths(conf, new Path(args[0])); | |
FileOutputFormat.setOutputPath(conf, new Path(args[1])); | |
JobClient.runJob(conf); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 U1,T1,1000 | |
2 U1,T2,1020 | |
3 U1,T3,1040 | |
4 U1,T4,2000 | |
5 U1,T1,2020 | |
6 U2,T3,1000 | |
7 U2,T4,1020 | |
8 U2,T1,1040 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment