Skip to content

Instantly share code, notes, and snippets.

Created February 4, 2014 12:42
Show Gist options
  • Save anonymous/8802923 to your computer and use it in GitHub Desktop.
Save anonymous/8802923 to your computer and use it in GitHub Desktop.
TimeCount
package org.aboutyun;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
public class TimeCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "time_count");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
private Text id = new Text();
private Text row = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] items = line.split("\t");
if (items.length == 8) {
if (StringUtils.isNumeric(items[6])) {
id.set(items[0] + "\t" + items[1]);
row.set(line);
context.write(id, row);
}
} else {
System.out.println("Wrong length: " + items.length);
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
static {
format.setLenient(false);
}
private Text rest = new Text();
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// Parse row to Record
ArrayList<Record> list = new ArrayList<Record>();
for (Text row : values) {
String[] items = row.toString().split("\t");
try {
Record record = new Record();
record.items = items;
record.start_time = format.parse(items[3]).getTime();
record.stay_time = Long.parseLong(items[6]) * 1000;
list.add(record);
} catch (ParseException e) {
e.printStackTrace();
}
}
// Sort
Collections.sort(list, new Comparator<Record>() {
@Override
public int compare(Record r1, Record r2) {
return (int) (r1.start_time - r2.start_time);
}
});
// Find and merge slice
ArrayList<Record> result = new ArrayList<Record>();
for (Record r1 : list) {
boolean found = false;
long r1_stop_time = r1.start_time + r1.stay_time;
for (Record r2 : result) {
long r2_stop_time = r2.start_time + r2.stay_time;
if (r1.start_time > r2.start_time && r1.start_time <= r2_stop_time && r1_stop_time > r2_stop_time) {
// merge the new slice
r2.stay_time = r1_stop_time - r2.start_time;
found = true;
}
}
if (!found) {
result.add(r1);
}
}
// Output
for (Record r : result) {
key.set(r.items[0]);
String value = r.items[1] + "\t"
+ r.items[2] + "\t"
+ r.items[3] + "\t"
+ r.items[4] + "\t"
+ r.items[5] + "\t"
+ (r.stay_time / 1000) + "\t"
+ r.items[6] + "\t";
rest.set(value);
context.write(key, rest);
}
}
static class Record {
String[] items;
long start_time;
long stay_time;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment