Created
November 9, 2012 17:54
-
-
Save jnatkins/4047144 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.cloudera.nile.etl; | |
import java.io.IOException; | |
import java.text.ParseException; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import java.util.Iterator; | |
import java.util.TimeZone; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
import org.apache.avro.Schema; | |
import org.apache.avro.Schema.Type; | |
import org.apache.avro.mapred.AvroCollector; | |
import org.apache.avro.mapred.AvroJob; | |
import org.apache.avro.mapred.AvroKey; | |
import org.apache.avro.mapred.AvroReducer; | |
import org.apache.avro.mapred.AvroValue; | |
import org.apache.avro.mapred.Pair; | |
import org.apache.avro.reflect.ReflectData; | |
import org.apache.avro.util.Utf8; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.FileInputFormat; | |
import org.apache.hadoop.mapred.FileOutputFormat; | |
import org.apache.hadoop.mapred.JobClient; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.MapReduceBase; | |
import org.apache.hadoop.mapred.Mapper; | |
import org.apache.hadoop.mapred.OutputCollector; | |
import org.apache.hadoop.mapred.Reducer; | |
import org.apache.hadoop.mapred.Reporter; | |
import org.apache.hadoop.mapred.TextInputFormat; | |
import org.apache.hadoop.mapred.TextOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
import com.cloudera.nile.etl.SessionRecord.SessionEvent; | |
import com.google.common.collect.Lists; | |
public class NileWebLogProcessor implements Tool { | |
private static final String LOG_PATTERN = | |
"^([\\d.]+) (\\S+) (\\S+) \\[([\\w:\\. /]+\\s[+\\-]\\d{4})\\] " + | |
"\"([^\\s]+) ([^\\s]+) ([^\"]+)\" (\\d{3}) (\\d+) \"([^\"]+)\" " + | |
"\"([^\"]+)\" \"([^\"]+)\""; | |
private static final String LOG_DATE_FORMAT_STRING = "yyyy/MM/dd HH:mm:ss.SSS Z"; | |
private static final SimpleDateFormat LOG_DATE_FORMAT = new SimpleDateFormat(LOG_DATE_FORMAT_STRING); | |
private static final SimpleDateFormat OUTPUT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); | |
static class HttpdLogParserMapper extends MapReduceBase | |
implements Mapper<LongWritable, Text, AvroKey<Utf8>, AvroValue<Utf8>> { | |
@Override | |
public void map(LongWritable key, Text value, | |
OutputCollector<AvroKey<Utf8>, AvroValue<Utf8>> output, Reporter reporter) | |
throws IOException { | |
Pattern p = Pattern.compile(LOG_PATTERN); | |
Matcher matcher = p.matcher(value.toString()); | |
if (!matcher.matches()) { | |
return; | |
} | |
OUTPUT_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); | |
Date timestamp = null; | |
try { | |
timestamp = LOG_DATE_FORMAT.parse(matcher.group(4)); | |
} catch (ParseException e) { | |
throw new IOException("Count not parse log date: " + matcher.group(4), e); | |
} | |
String outTimestamp = OUTPUT_DATE_FORMAT.format(timestamp); | |
Utf8 cookie = new Utf8(matcher.group(12)); // Cookie | |
Utf8 outVal = new Utf8(matcher.group(1) + "\t" + // IP addr | |
matcher.group(2) + "\t" + // unused | |
matcher.group(3) + "\t" + // unused | |
outTimestamp + "\t" + // Timestamp | |
matcher.group(5) + "\t" + // Method | |
matcher.group(6) + "\t" + // Resource | |
matcher.group(7) + "\t" + // Protocol | |
matcher.group(8) + "\t" + // Response Code | |
matcher.group(9) + "\t" + // Response Size | |
matcher.group(10) + "\t" + // Referrer | |
matcher.group(11)); // User Agent | |
output.collect(new AvroKey<Utf8>(cookie), new AvroValue<Utf8>(outVal)); | |
} | |
} | |
static class HttpdLogParserReducer | |
extends AvroReducer<Utf8, Utf8, Pair<Utf8, SessionRecord>> { | |
@Override | |
public void reduce(Utf8 key, Iterable<Utf8> values, | |
AvroCollector<Pair<Utf8, SessionRecord>> collector, | |
Reporter reporter) throws IOException { | |
String sessionKey = key + "_"; | |
SessionRecord record = new SessionRecord(); | |
record.startTime = "start"; | |
record.endTime = "end"; | |
record.cookie = key.toString(); | |
record.events = Lists.newArrayList(); | |
int numEvents = 0; | |
for (Utf8 value : values) { | |
String fields[] = value.toString().split("\t"); | |
if (numEvents == 0) { | |
try { | |
sessionKey += OUTPUT_DATE_FORMAT.parse(fields[4]).getTime(); | |
} catch (ParseException e) { | |
e.printStackTrace(); | |
} | |
} | |
SessionEvent event = new SessionEvent(); | |
event.ipAddr = fields[1]; | |
event.timestamp = fields[4]; | |
event.method = fields[5]; | |
event.resource = fields[6]; | |
event.protocol = fields[7]; | |
event.responseCode = Integer.valueOf(fields[8]); | |
event.responseSize = Integer.valueOf(fields[9]); | |
event.referrer = fields[10]; | |
event.userAgent = fields[11]; | |
record.events.add(event); | |
numEvents++; | |
} | |
record.numEvents = numEvents; | |
Pair<Utf8, SessionRecord> datum = | |
new Pair<Utf8, SessionRecord>(new Utf8(sessionKey), record); | |
collector.collect(datum); | |
} | |
} | |
@Override | |
public void setConf(Configuration conf) { | |
} | |
@Override | |
public Configuration getConf() { | |
return null; | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
if (args.length != 2) { | |
return -1; | |
} | |
JobConf jobConf = new JobConf(NileWebLogProcessor.class); | |
FileInputFormat.addInputPath(jobConf, new Path(args[0])); | |
FileOutputFormat.setOutputPath(jobConf, new Path(args[1])); | |
jobConf.setInputFormat(TextInputFormat.class); | |
jobConf.setMapperClass(HttpdLogParserMapper.class); | |
AvroJob.setReducerClass(jobConf, HttpdLogParserReducer.class); | |
Schema stringSchema = Schema.create(Type.STRING); | |
Schema recordSchema = ReflectData.get().getSchema(SessionRecord.class); | |
Schema outSchema = Pair.getPairSchema(stringSchema, recordSchema); | |
AvroJob.setOutputSchema(jobConf, outSchema); | |
JobClient.runJob(jobConf); | |
return 0; | |
} | |
public static void main(String[] args) throws Exception { | |
int res = ToolRunner.run(new Configuration(), | |
new NileWebLogProcessor(), | |
args); | |
System.exit(res); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment