Created
April 26, 2012 19:15
-
-
Save fbrubacher/2502186 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class App | |
{ | |
public static Flow getFinalizingJob() | |
{ | |
Scheme sourceScheme = new TextLine( new Fields( "line" ) ); | |
Tap source = new Hfs( sourceScheme, "results2" ); | |
Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) ); | |
Tap sink = new Hfs( sinkScheme, "results3", SinkMode.REPLACE ); | |
Pipe assembly = new Pipe( "wordcount" ); | |
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)"; | |
Function function = new RegexGenerator( new Fields( "word" ), regex ); | |
assembly = new Each( assembly, new Fields( "line" ), function ); | |
assembly = new GroupBy( assembly, new Fields( "word" ) ); | |
Aggregator count = new Count( new Fields( "count" ) ); | |
assembly = new Every( assembly, count ); | |
Properties properties = new Properties(); | |
FlowConnector.setApplicationJarClass( properties, App.class ); | |
FlowConnector flowConnector = new FlowConnector( properties ); | |
Flow flow = flowConnector.connect( "job3", source, sink, assembly ); | |
return flow; | |
} | |
public static Flow initializingFlow() | |
{ | |
Scheme sourceScheme = new TextLine( new Fields( "line" ) ); | |
Tap source = new Hfs( sourceScheme, "alice_in_wonderland.txt" ); | |
Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) ); | |
Tap sink = new Hfs( sinkScheme, "results", SinkMode.REPLACE ); | |
Pipe assembly = new Pipe( "wordcount" ); | |
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)"; | |
Function function = new RegexGenerator( new Fields( "word" ), regex ); | |
assembly = new Each( assembly, new Fields( "line" ), function ); | |
assembly = new GroupBy( assembly, new Fields( "word" ) ); | |
Aggregator count = new Count( new Fields( "count" ) ); | |
assembly = new Every( assembly, count ); | |
Properties properties = new Properties(); | |
FlowConnector.setApplicationJarClass( properties, App.class ); | |
FlowConnector flowConnector = new FlowConnector( properties ); | |
Flow flow = flowConnector.connect( "job1", source, sink, assembly ); | |
return flow; | |
} | |
public static void main( String[] args ) | |
{ | |
MyRiffle pr = new MyRiffle(0, "results", "results2", 0); | |
ProcessFlow<MyRiffle> prFlow = new ProcessFlow<MyRiffle>("job2", pr); | |
CascadeConnector connector = new CascadeConnector(); | |
Cascade cascade = connector.connect(initializingFlow(), prFlow, getFinalizingJob()); | |
cascade.complete(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mycompany.app; | |
import java.io.IOException; | |
import riffle.process.DependencyIncoming; | |
import riffle.process.DependencyOutgoing; | |
import riffle.process.ProcessComplete; | |
import riffle.process.ProcessStart; | |
import riffle.process.ProcessStop; | |
/** | |
* | |
*/ | |
@riffle.process.Process | |
public class MyRiffle | |
{ | |
String incoming; | |
String outgoing; | |
public MyRiffle( int id, String incoming, String outgoing, long delay ) | |
{ | |
this.incoming = incoming; | |
this.outgoing = outgoing; | |
} | |
@DependencyOutgoing | |
public String getOutgoing() | |
{ | |
return outgoing; | |
} | |
@DependencyIncoming | |
public String getIncoming() | |
{ | |
return incoming; | |
} | |
@ProcessStart | |
public void start() | |
{ | |
} | |
@ProcessComplete | |
public void complete() | |
{ | |
start(); | |
} | |
@ProcessStop | |
public void stop() | |
{ | |
} | |
@Override | |
public String toString() | |
{ | |
return "hi"; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment