Skip to content

Instantly share code, notes, and snippets.

@cloverrose
Created September 9, 2012 23:44
Show Gist options
  • Save cloverrose/3687970 to your computer and use it in GitHub Desktop.
Save cloverrose/3687970 to your computer and use it in GitHub Desktop.
MapReduce
package org.myorg;
import java.io.IOException;
import java.util.*;
import java.lang.Math;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class AutoDj {
private final static int THRESHOLD = 10;
public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
private Text word_key = new Text();
private Text word_value = new Text();
public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
// <Line, UserId,TrackId,ListenDatetime>
// <UserId, [(TrackId, ListendDatetime)]>
String line = value.toString();
String splits[] = line.split(",");
word_key.set(splits[0]);
word_value.set(splits[1] + "," + splits[2]);
output.collect(word_key, word_value);
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, IntWritable> {
private Text word_key = new Text();
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
// <UserId, [(TrackId, ListenedDatetime)]>
// <(TrackId, TrackId), Count>
List<List<String>> temp = new ArrayList<List<String>>();
while (values.hasNext()) {
String value = values.next().toString();
String splits[] = value.split(",");
String trackId = splits[0];
String datetime = splits[1]; // timestampと仮定
List<String> pair = new ArrayList<String>();
pair.add(trackId);
pair.add(datetime);
temp.add(pair);
}
for(List<String> pair : temp) {
String trackId = pair.get(0);
String datetimeS = pair.get(1); // timestampと仮定
int datetime = Integer.parseInt(datetimeS);
HashMap<String, Integer> H = new HashMap<String, Integer>();
for(List<String> pair2 : temp){
String trackId2 = pair2.get(0);
String datetimeS2 = pair2.get(1); // timestampと仮定
int datetime2 = Integer.parseInt(datetimeS2);
if(Math.abs(datetime - datetime2) < THRESHOLD){
if(!H.containsKey(trackId2)){
H.put(trackId2, 1);
}else{
H.put(trackId2, H.get(trackId2) + 1);
}
}
}
for(java.util.Map.Entry<String, Integer> e : H.entrySet()){
String newKey = trackId + ":" + e.getKey();
word_key.set(newKey);
output.collect(word_key, new IntWritable(e.getValue()));
}
}
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(AutoDj.class);
conf.setJobName("autodj");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
1 U1,T1,1000
2 U1,T2,1020
3 U1,T3,1040
4 U1,T4,2000
5 U1,T1,2020
6 U2,T3,1000
7 U2,T4,1020
8 U2,T1,1040
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment