Skip to content

Instantly share code, notes, and snippets.

@Aeon
Created May 21, 2009 21:24
Show Gist options
  • Save Aeon/115755 to your computer and use it in GitHub Desktop.
Save Aeon/115755 to your computer and use it in GitHub Desktop.
error:
$ time hadoop jar ./build/loganalysis.jar
09/05/21 13:59:00 INFO flow.MultiMapReducePlanner: using application jar: /home/anton/loganalysis/./build/loganalysis.jar
Exception in thread "main" java.lang.NullPointerException
at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:211)
at cascading.flow.FlowConnector.connect(FlowConnector.java:452)
at cascading.flow.FlowConnector.connect(FlowConnector.java:434)
at cascading.flow.FlowConnector.connect(FlowConnector.java:406)
at cascading.flow.FlowConnector.connect(FlowConnector.java:373)
at loganalysis.Main.main(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
real 0m0.819s
user 0m1.100s
sys 0m0.120s
package loganalysis;
import java.util.Map;
import java.util.Properties;
import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.aggregator.Count;
import cascading.operation.aggregator.Sum;
import cascading.operation.expression.ExpressionFunction;
import cascading.operation.expression.ExpressionFilter;
import cascading.operation.regex.RegexParser;
import cascading.operation.text.DateParser;
import cascading.operation.Debug;
import cascading.operation.Identity;
import cascading.operation.filter.Not;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.CoGroup;
import cascading.pipe.cogroup.OuterJoin;
import cascading.pipe.cogroup.LeftJoin;
import cascading.pipe.Pipe;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.Lfs;
import cascading.tap.Dfs;
import cascading.tap.Tap;
import cascading.tap.SinkMode;
import cascading.tuple.Fields;
import cascading.hbase.*;
import loganalysis.OrganicFilter;
import loganalysis.LookupDomainFunction;
/**
*
*/
public class Main
{
public static void main( String[] args )
{
// set the current job jar
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, Main.class );
FlowConnector flowConnector = new FlowConnector( properties );
CascadeConnector cascadeConnector = new CascadeConnector();
String hitPath = "/logs/hits/short-hits-20090504.log";
// create an assembly to import an Apache log file and store on DFS
// declares: "time", "method", "event", "status", "size"
Fields logFields = new Fields( "day", "urlid", "method" );
String logRegex = "^(.{10}) .{8} .*urlid\\\":(\\d+),.*method\\\":(\\d+),.*$";
int[] logGroups = {1, 2, 3};
RegexParser parser = new RegexParser( logFields, logRegex, logGroups );
Pipe importPipe = new Each( "import", new Fields( "line" ), parser );
// filter out any lines with invalid urlid or method
importPipe = new Each( importPipe, new Fields( "urlid", "method" ), new ExpressionFilter( "urlid == null && method == null", String.class ) );
// create tap to read a resource from the local file system
Tap hitLogTap = new Dfs( new TextLine(), hitPath );
Pipe organicCountPipe = new Pipe( "organicCount", importPipe );
organicCountPipe = new Each( organicCountPipe, new OrganicFilter() );
organicCountPipe = new GroupBy( organicCountPipe, new Fields( "day", "urlid" ) );
organicCountPipe = new Every( organicCountPipe, Fields.GROUP, new Count() );
Pipe paidCountPipe = new Pipe( "paidCount", importPipe );
paidCountPipe = new Each( paidCountPipe, new Not( new OrganicFilter() ) );
paidCountPipe = new GroupBy( paidCountPipe, new Fields( "day", "urlid" ) );
paidCountPipe = new Every( paidCountPipe, Fields.GROUP, new Count() );
// rename paid counter fields so we can merge them
paidCountPipe = new Each( paidCountPipe, new Fields( "day", "urlid", "count" ), new Identity( new Fields( "paid_day", "paid_urlid", "paid_count" ) ) );
Pipe organicDomainCountPipe = new Pipe( "organicDomainCount", organicCountPipe);
organicDomainCountPipe = new Each( organicDomainCountPipe, new Fields ("urlid"), new LookupDomainFunction(), Fields.ALL );
organicDomainCountPipe = new GroupBy( organicDomainCountPipe, new Fields( "day", "domainid" ) );
organicDomainCountPipe = new Every( organicDomainCountPipe, new Fields ("count"), new Sum(), Fields.ALL );
Pipe paidDomainCountPipe = new Pipe( "paidDomainCount", paidCountPipe);
paidDomainCountPipe = new Each( paidDomainCountPipe, new Fields ("paid_urlid"), new LookupDomainFunction(), Fields.ALL );
paidDomainCountPipe = new GroupBy( paidDomainCountPipe, new Fields( "paid_day", "domainid" ) );
paidDomainCountPipe = new Every( paidDomainCountPipe, new Fields ("paid_count"), new Sum(), Fields.ALL );
// rename paid domain merge fields so we can merge paid and organic domain counters
paidDomainCountPipe = new Each( paidDomainCountPipe, new Fields( "paid_day", "domainid", "sum" ), new Identity( new Fields( "paid_day", "paid_domainid", "paid_sum" ) ) );
// merge the organic and paid counter pipes
Fields lhsFields = new Fields( "day", "urlid" );
Fields rhsFields = new Fields( "paid_day", "paid_urlid" );
Pipe merge = new Pipe( "url_merge" );
merge = new CoGroup( organicCountPipe, lhsFields, paidCountPipe, rhsFields, new OuterJoin() );
// create a composite field to use as hbase key
merge = new Each( merge, new Fields ( "urlid", "day" ), new ExpressionFunction( new Fields( "urlid_day" ), "urlid + \":\" + day", String.class ), Fields.ALL );
// get rid of any null fields
merge = new Each( merge, new Fields ( "count" ), new ExpressionFunction( new Fields( "fixed_count" ), "count == null ? \"0\" : count", String.class ), Fields.ALL );
merge = new Each( merge, new Fields ( "paid_count" ), new ExpressionFunction( new Fields( "fixed_paid_count" ), "paid_count == null ? \"0\" : paid_count", String.class ), Fields.ALL );
// discard all fields we don't need, and rename the count and paid count to hbase field names
merge = new Each( merge, new Fields( "urlid_day", "fixed_count", "fixed_paid_count" ), new Identity( new Fields ( "urlid_day", "default:organic_hits", "default:paid_hits" ) ) );
// merge the organic and paid domain counter pipes
Fields lhsDMFields = new Fields( "day", "domainid" );
Fields rhsDMFields = new Fields( "paid_day", "paid_domainid" );
Pipe domainMerge = new Pipe( "domain_merge" );
domainMerge = new CoGroup( organicDomainCountPipe, lhsDMFields, paidDomainCountPipe, rhsDMFields, new OuterJoin() );
// create a composite field to use as hbase key
domainMerge = new Each( domainMerge, new Fields ( "domainid", "day" ), new ExpressionFunction( new Fields( "domainid_day" ), "domainid + \":\" + day", String.class ), Fields.ALL );
// get rid of any null fields
domainMerge = new Each( domainMerge, new Fields ( "sum" ), new ExpressionFunction( new Fields( "fixed_sum" ), "sum == null ? \"0\" : sum", String.class ), Fields.ALL );
domainMerge = new Each( domainMerge, new Fields ( "paid_sum" ), new ExpressionFunction( new Fields( "fixed_paid_sum" ), "paid_sum == null ? \"0\" : paid_sum", String.class ), Fields.ALL );
// discard all fields we don't need, and rename the count and paid count to hbase field names
domainMerge = new Each( domainMerge, new Fields( "domainid_day", "fixed_sum", "fixed_paid_sum" ), new Identity( new Fields ( "domainid_day", "default:organic_hits", "default:paid_hits" ) ) );
Fields keyFields = new Fields( "urlid_day" );
Fields[] valueFields = new Fields[]{new Fields( "default:organic_hits", "default:paid_hits" ) };
Tap logHBaseSinkTap = new HBaseTap("url_hit_summary", new HBaseScheme(keyFields, valueFields) );
Fields domainKeyFields = new Fields( "domainid_day" );
Fields[] domainValueFields = new Fields[]{new Fields( "default:organic_hits", "default:paid_hits" ) };
Tap domainHBaseSinkTap = new HBaseTap("domain_hit_summary", new HBaseScheme(domainKeyFields, domainValueFields) );
Map<String, Tap> sinks = Cascades.tapsMap( Pipe.pipes( merge, domainMerge ), Tap.taps( logHBaseSinkTap, domainHBaseSinkTap ) );
Flow hitDailyFlow = flowConnector.connect( hitLogTap, sinks, Pipe.pipes(merge, domainMerge) );
// optionally print out the arrivalRateFlow to a graph file for import into a graphics package
hitDailyFlow.writeDOT( "flow.dot" );
// connect the flows by their dependencies, order is not significant
Cascade cascade = cascadeConnector.connect( hitDailyFlow );
// execute the cascade, which in turn executes each flow in dependency order
cascade.complete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment