Skip to content

Instantly share code, notes, and snippets.

@piotrbelina
Created July 25, 2013 19:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save piotrbelina/6083056 to your computer and use it in GitHub Desktop.
Save piotrbelina/6083056 to your computer and use it in GitHub Desktop.
Cascading apache log parser for boomerang.js
package piotr;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.Aggregator;
import cascading.operation.aggregator.Average;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextLine;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
String logPath = args[0];
String statsPath = args[1];
String trapPath = args[2];
Properties properties = new Properties();
AppProps.setApplicationJarClass(properties, Main.class);
HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
TextLine scheme = new TextLine(new Fields("offset", "line"));
Tap logTap = new Hfs(scheme, logPath);
Tap remoteLogTap = new Hfs( new TextLine(), statsPath, SinkMode.REPLACE);
Tap trapTap = new Hfs( new TextDelimited(true, "\t"), trapPath );
Fields apacheFields = new Fields( "ip", "time", "method", "event");
Fields urlFields = new Fields( "ip", "time", "method", "event", "url", "done", "t_resp", "bw", "bw_err", "lat", "lat_err");
String apacheRegex = "([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*).*$";
int[] allGroups = {1, 2, 3, 4};
RegexParser parser = new RegexParser(apacheFields, apacheRegex, allGroups);
Pipe importPipe = new Each("import", new Fields("line"), parser, Fields.RESULTS);
Pipe logPipe = new Pipe("topurl", importPipe);
RegexFilter regexFilter = new RegexFilter("beacon\\.php");
logPipe = new Each(logPipe, new Fields("event"), regexFilter);
logPipe = new Each(logPipe, apacheFields, new UrlParser(urlFields),Fields.RESULTS);
logPipe = new GroupBy(logPipe, new Fields("url"));
Aggregator count = new Count(new Fields("count"));
Aggregator averageDone = new Average(new Fields("avg_done"));
Aggregator averageResponse = new Average(new Fields("avg_response"));
logPipe = new Every(logPipe, count);
logPipe = new Every(logPipe, new Fields("done"), averageDone);
logPipe = new Every(logPipe, new Fields("t_resp"), averageResponse);
logPipe = new GroupBy(logPipe, new Fields("count"), new Fields("url"), true);
FlowDef flowDef = FlowDef.flowDef()
.setName("apachelog")
.addSource(importPipe, logTap)
.addTailSink(logPipe, remoteLogTap)
.addTrap(importPipe, trapTap);
Flow flow = flowConnector
.connect(flowDef);
flow.writeDOT("dot/apachelog.dot");
flow.complete();
}
}
package piotr;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class UrlParser extends BaseOperation implements Function {
public UrlParser(Fields fieldDeclaration) {
super(4, fieldDeclaration);
}
public static Map<String, List<String>> getUrlParameters(String url)
throws UnsupportedEncodingException {
Map<String, List<String>> params = new HashMap<String, List<String>>();
String[] urlParts = url.split("\\?");
if (urlParts.length > 1) {
String query = urlParts[1];
for (String param : query.split("&")) {
String pair[] = param.split("=");
String key = URLDecoder.decode(pair[0], "UTF-8");
String value = "";
if (pair.length > 1) {
value = URLDecoder.decode(pair[1], "UTF-8");
}
List<String> values = params.get(key);
if (values == null) {
values = new ArrayList<String>();
params.put(key, values);
}
values.add(value);
}
}
return params;
}
public<E> E getFirst(List<E> list) {
if (list.size() == 1) {
return list.get(0);
}
throw new IndexOutOfBoundsException();
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
TupleEntry argument = functionCall.getArguments();
String ip = argument.getString( 0 );
String time = argument.getString( 1 );
String method = argument.getString( 2 );
String event = argument.getString( 3 );
try
{
Map<String, List<String>> params = getUrlParameters(event);
String url = getFirst(params.get("u"));
String done = getFirst(params.get("t_done"));
String t_resp = getFirst(params.get("t_resp"));
String bw = getFirst(params.get("bw"));
String bw_err = getFirst(params.get("bw_err"));
String lat = getFirst(params.get("lat"));
String lat_err = getFirst(params.get("lat_err"));
Tuple result = new Tuple();
result.add( ip );
result.add( time );
result.add( method );
result.add( event );
result.add( url );
result.add( done );
result.add( t_resp );
result.add( bw );
result.add( bw_err );
result.add( lat );
result.add( lat_err );
functionCall.getOutputCollector().add( result );
} catch (UnsupportedEncodingException e) {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment