Created
May 21, 2009 21:24
-
-
Save Aeon/115755 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
error: | |
$ time hadoop jar ./build/loganalysis.jar | |
09/05/21 13:59:00 INFO flow.MultiMapReducePlanner: using application jar: /home/anton/loganalysis/./build/loganalysis.jar | |
Exception in thread "main" java.lang.NullPointerException | |
at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:211) | |
at cascading.flow.FlowConnector.connect(FlowConnector.java:452) | |
at cascading.flow.FlowConnector.connect(FlowConnector.java:434) | |
at cascading.flow.FlowConnector.connect(FlowConnector.java:406) | |
at cascading.flow.FlowConnector.connect(FlowConnector.java:373) | |
at loganalysis.Main.main(Unknown Source) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) | |
at java.lang.reflect.Method.invoke(Method.java:597) | |
at org.apache.hadoop.util.RunJar.main(RunJar.java:156) | |
real 0m0.819s | |
user 0m1.100s | |
sys 0m0.120s |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package loganalysis; | |
import java.util.Map; | |
import java.util.Properties; | |
import cascading.cascade.Cascade; | |
import cascading.cascade.CascadeConnector; | |
import cascading.cascade.Cascades; | |
import cascading.flow.Flow; | |
import cascading.flow.FlowConnector; | |
import cascading.operation.aggregator.Count; | |
import cascading.operation.aggregator.Sum; | |
import cascading.operation.expression.ExpressionFunction; | |
import cascading.operation.expression.ExpressionFilter; | |
import cascading.operation.regex.RegexParser; | |
import cascading.operation.text.DateParser; | |
import cascading.operation.Debug; | |
import cascading.operation.Identity; | |
import cascading.operation.filter.Not; | |
import cascading.pipe.Each; | |
import cascading.pipe.Every; | |
import cascading.pipe.GroupBy; | |
import cascading.pipe.CoGroup; | |
import cascading.pipe.cogroup.OuterJoin; | |
import cascading.pipe.cogroup.LeftJoin; | |
import cascading.pipe.Pipe; | |
import cascading.scheme.TextLine; | |
import cascading.tap.Hfs; | |
import cascading.tap.Lfs; | |
import cascading.tap.Dfs; | |
import cascading.tap.Tap; | |
import cascading.tap.SinkMode; | |
import cascading.tuple.Fields; | |
import cascading.hbase.*; | |
import loganalysis.OrganicFilter; | |
import loganalysis.LookupDomainFunction; | |
/** | |
* | |
*/ | |
public class Main | |
{ | |
public static void main( String[] args ) | |
{ | |
// set the current job jar | |
Properties properties = new Properties(); | |
FlowConnector.setApplicationJarClass( properties, Main.class ); | |
FlowConnector flowConnector = new FlowConnector( properties ); | |
CascadeConnector cascadeConnector = new CascadeConnector(); | |
String hitPath = "/logs/hits/short-hits-20090504.log"; | |
// create an assembly to import an Apache log file and store on DFS | |
// declares: "time", "method", "event", "status", "size" | |
Fields logFields = new Fields( "day", "urlid", "method" ); | |
String logRegex = "^(.{10}) .{8} .*urlid\\\":(\\d+),.*method\\\":(\\d+),.*$"; | |
int[] logGroups = {1, 2, 3}; | |
RegexParser parser = new RegexParser( logFields, logRegex, logGroups ); | |
Pipe importPipe = new Each( "import", new Fields( "line" ), parser ); | |
// filter out any lines with invalid urlid or method | |
importPipe = new Each( importPipe, new Fields( "urlid", "method" ), new ExpressionFilter( "urlid == null && method == null", String.class ) ); | |
// create tap to read a resource from the local file system | |
Tap hitLogTap = new Dfs( new TextLine(), hitPath ); | |
Pipe organicCountPipe = new Pipe( "organicCount", importPipe ); | |
organicCountPipe = new Each( organicCountPipe, new OrganicFilter() ); | |
organicCountPipe = new GroupBy( organicCountPipe, new Fields( "day", "urlid" ) ); | |
organicCountPipe = new Every( organicCountPipe, Fields.GROUP, new Count() ); | |
Pipe paidCountPipe = new Pipe( "paidCount", importPipe ); | |
paidCountPipe = new Each( paidCountPipe, new Not( new OrganicFilter() ) ); | |
paidCountPipe = new GroupBy( paidCountPipe, new Fields( "day", "urlid" ) ); | |
paidCountPipe = new Every( paidCountPipe, Fields.GROUP, new Count() ); | |
// rename paid counter fields so we can merge them | |
paidCountPipe = new Each( paidCountPipe, new Fields( "day", "urlid", "count" ), new Identity( new Fields( "paid_day", "paid_urlid", "paid_count" ) ) ); | |
Pipe organicDomainCountPipe = new Pipe( "organicDomainCount", organicCountPipe); | |
organicDomainCountPipe = new Each( organicDomainCountPipe, new Fields ("urlid"), new LookupDomainFunction(), Fields.ALL ); | |
organicDomainCountPipe = new GroupBy( organicDomainCountPipe, new Fields( "day", "domainid" ) ); | |
organicDomainCountPipe = new Every( organicDomainCountPipe, new Fields ("count"), new Sum(), Fields.ALL ); | |
Pipe paidDomainCountPipe = new Pipe( "paidDomainCount", paidCountPipe); | |
paidDomainCountPipe = new Each( paidDomainCountPipe, new Fields ("paid_urlid"), new LookupDomainFunction(), Fields.ALL ); | |
paidDomainCountPipe = new GroupBy( paidDomainCountPipe, new Fields( "paid_day", "domainid" ) ); | |
paidDomainCountPipe = new Every( paidDomainCountPipe, new Fields ("paid_count"), new Sum(), Fields.ALL ); | |
// rename paid domain merge fields so we can merge paid and organic domain counters | |
paidDomainCountPipe = new Each( paidDomainCountPipe, new Fields( "paid_day", "domainid", "sum" ), new Identity( new Fields( "paid_day", "paid_domainid", "paid_sum" ) ) ); | |
// merge the organic and paid counter pipes | |
Fields lhsFields = new Fields( "day", "urlid" ); | |
Fields rhsFields = new Fields( "paid_day", "paid_urlid" ); | |
Pipe merge = new Pipe( "url_merge" ); | |
merge = new CoGroup( organicCountPipe, lhsFields, paidCountPipe, rhsFields, new OuterJoin() ); | |
// create a composite field to use as hbase key | |
merge = new Each( merge, new Fields ( "urlid", "day" ), new ExpressionFunction( new Fields( "urlid_day" ), "urlid + \":\" + day", String.class ), Fields.ALL ); | |
// get rid of any null fields | |
merge = new Each( merge, new Fields ( "count" ), new ExpressionFunction( new Fields( "fixed_count" ), "count == null ? \"0\" : count", String.class ), Fields.ALL ); | |
merge = new Each( merge, new Fields ( "paid_count" ), new ExpressionFunction( new Fields( "fixed_paid_count" ), "paid_count == null ? \"0\" : paid_count", String.class ), Fields.ALL ); | |
// discard all fields we don't need, and rename the count and paid count to hbase field names | |
merge = new Each( merge, new Fields( "urlid_day", "fixed_count", "fixed_paid_count" ), new Identity( new Fields ( "urlid_day", "default:organic_hits", "default:paid_hits" ) ) ); | |
// merge the organic and paid domain counter pipes | |
Fields lhsDMFields = new Fields( "day", "domainid" ); | |
Fields rhsDMFields = new Fields( "paid_day", "paid_domainid" ); | |
Pipe domainMerge = new Pipe( "domain_merge" ); | |
domainMerge = new CoGroup( organicDomainCountPipe, lhsDMFields, paidDomainCountPipe, rhsDMFields, new OuterJoin() ); | |
// create a composite field to use as hbase key | |
domainMerge = new Each( domainMerge, new Fields ( "domainid", "day" ), new ExpressionFunction( new Fields( "domainid_day" ), "domainid + \":\" + day", String.class ), Fields.ALL ); | |
// get rid of any null fields | |
domainMerge = new Each( domainMerge, new Fields ( "sum" ), new ExpressionFunction( new Fields( "fixed_sum" ), "sum == null ? \"0\" : sum", String.class ), Fields.ALL ); | |
domainMerge = new Each( domainMerge, new Fields ( "paid_sum" ), new ExpressionFunction( new Fields( "fixed_paid_sum" ), "paid_sum == null ? \"0\" : paid_sum", String.class ), Fields.ALL ); | |
// discard all fields we don't need, and rename the count and paid count to hbase field names | |
domainMerge = new Each( domainMerge, new Fields( "domainid_day", "fixed_sum", "fixed_paid_sum" ), new Identity( new Fields ( "domainid_day", "default:organic_hits", "default:paid_hits" ) ) ); | |
Fields keyFields = new Fields( "urlid_day" ); | |
Fields[] valueFields = new Fields[]{new Fields( "default:organic_hits", "default:paid_hits" ) }; | |
Tap logHBaseSinkTap = new HBaseTap("url_hit_summary", new HBaseScheme(keyFields, valueFields) ); | |
Fields domainKeyFields = new Fields( "domainid_day" ); | |
Fields[] domainValueFields = new Fields[]{new Fields( "default:organic_hits", "default:paid_hits" ) }; | |
Tap domainHBaseSinkTap = new HBaseTap("domain_hit_summary", new HBaseScheme(domainKeyFields, domainValueFields) ); | |
Map<String, Tap> sinks = Cascades.tapsMap( Pipe.pipes( merge, domainMerge ), Tap.taps( logHBaseSinkTap, domainHBaseSinkTap ) ); | |
Flow hitDailyFlow = flowConnector.connect( hitLogTap, sinks, Pipe.pipes(merge, domainMerge) ); | |
// optionally print out the arrivalRateFlow to a graph file for import into a graphics package | |
hitDailyFlow.writeDOT( "flow.dot" ); | |
// connect the flows by their dependencies, order is not significant | |
Cascade cascade = cascadeConnector.connect( hitDailyFlow ); | |
// execute the cascade, which in turn executes each flow in dependency order | |
cascade.complete(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment