Skip to content

Instantly share code, notes, and snippets.

@jnatkins
Created November 9, 2012 17:54
Show Gist options
  • Save jnatkins/4047144 to your computer and use it in GitHub Desktop.
Save jnatkins/4047144 to your computer and use it in GitHub Desktop.
package com.cloudera.nile.etl;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroReducer;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapred.Pair;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.cloudera.nile.etl.SessionRecord.SessionEvent;
import com.google.common.collect.Lists;
public class NileWebLogProcessor implements Tool {
private static final String LOG_PATTERN =
"^([\\d.]+) (\\S+) (\\S+) \\[([\\w:\\. /]+\\s[+\\-]\\d{4})\\] " +
"\"([^\\s]+) ([^\\s]+) ([^\"]+)\" (\\d{3}) (\\d+) \"([^\"]+)\" " +
"\"([^\"]+)\" \"([^\"]+)\"";
private static final String LOG_DATE_FORMAT_STRING = "yyyy/MM/dd HH:mm:ss.SSS Z";
private static final SimpleDateFormat LOG_DATE_FORMAT = new SimpleDateFormat(LOG_DATE_FORMAT_STRING);
private static final SimpleDateFormat OUTPUT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
static class HttpdLogParserMapper extends MapReduceBase
implements Mapper<LongWritable, Text, AvroKey<Utf8>, AvroValue<Utf8>> {
@Override
public void map(LongWritable key, Text value,
OutputCollector<AvroKey<Utf8>, AvroValue<Utf8>> output, Reporter reporter)
throws IOException {
Pattern p = Pattern.compile(LOG_PATTERN);
Matcher matcher = p.matcher(value.toString());
if (!matcher.matches()) {
return;
}
OUTPUT_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
Date timestamp = null;
try {
timestamp = LOG_DATE_FORMAT.parse(matcher.group(4));
} catch (ParseException e) {
throw new IOException("Count not parse log date: " + matcher.group(4), e);
}
String outTimestamp = OUTPUT_DATE_FORMAT.format(timestamp);
Utf8 cookie = new Utf8(matcher.group(12)); // Cookie
Utf8 outVal = new Utf8(matcher.group(1) + "\t" + // IP addr
matcher.group(2) + "\t" + // unused
matcher.group(3) + "\t" + // unused
outTimestamp + "\t" + // Timestamp
matcher.group(5) + "\t" + // Method
matcher.group(6) + "\t" + // Resource
matcher.group(7) + "\t" + // Protocol
matcher.group(8) + "\t" + // Response Code
matcher.group(9) + "\t" + // Response Size
matcher.group(10) + "\t" + // Referrer
matcher.group(11)); // User Agent
output.collect(new AvroKey<Utf8>(cookie), new AvroValue<Utf8>(outVal));
}
}
static class HttpdLogParserReducer
extends AvroReducer<Utf8, Utf8, Pair<Utf8, SessionRecord>> {
@Override
public void reduce(Utf8 key, Iterable<Utf8> values,
AvroCollector<Pair<Utf8, SessionRecord>> collector,
Reporter reporter) throws IOException {
String sessionKey = key + "_";
SessionRecord record = new SessionRecord();
record.startTime = "start";
record.endTime = "end";
record.cookie = key.toString();
record.events = Lists.newArrayList();
int numEvents = 0;
for (Utf8 value : values) {
String fields[] = value.toString().split("\t");
if (numEvents == 0) {
try {
sessionKey += OUTPUT_DATE_FORMAT.parse(fields[4]).getTime();
} catch (ParseException e) {
e.printStackTrace();
}
}
SessionEvent event = new SessionEvent();
event.ipAddr = fields[1];
event.timestamp = fields[4];
event.method = fields[5];
event.resource = fields[6];
event.protocol = fields[7];
event.responseCode = Integer.valueOf(fields[8]);
event.responseSize = Integer.valueOf(fields[9]);
event.referrer = fields[10];
event.userAgent = fields[11];
record.events.add(event);
numEvents++;
}
record.numEvents = numEvents;
Pair<Utf8, SessionRecord> datum =
new Pair<Utf8, SessionRecord>(new Utf8(sessionKey), record);
collector.collect(datum);
}
}
@Override
public void setConf(Configuration conf) {
}
@Override
public Configuration getConf() {
return null;
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
return -1;
}
JobConf jobConf = new JobConf(NileWebLogProcessor.class);
FileInputFormat.addInputPath(jobConf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setMapperClass(HttpdLogParserMapper.class);
AvroJob.setReducerClass(jobConf, HttpdLogParserReducer.class);
Schema stringSchema = Schema.create(Type.STRING);
Schema recordSchema = ReflectData.get().getSchema(SessionRecord.class);
Schema outSchema = Pair.getPairSchema(stringSchema, recordSchema);
AvroJob.setOutputSchema(jobConf, outSchema);
JobClient.runJob(jobConf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(),
new NileWebLogProcessor(),
args);
System.exit(res);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment