Skip to content

Instantly share code, notes, and snippets.

@fbrubacher
Created April 26, 2012 19:15
Show Gist options
  • Save fbrubacher/2502186 to your computer and use it in GitHub Desktop.
Save fbrubacher/2502186 to your computer and use it in GitHub Desktop.
public class App
{
public static Flow getFinalizingJob()
{
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, "results2" );
Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, "results3", SinkMode.REPLACE );
Pipe assembly = new Pipe( "wordcount" );
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );
assembly = new GroupBy( assembly, new Fields( "word" ) );
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, App.class );
FlowConnector flowConnector = new FlowConnector( properties );
Flow flow = flowConnector.connect( "job3", source, sink, assembly );
return flow;
}
public static Flow initializingFlow()
{
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, "alice_in_wonderland.txt" );
Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, "results", SinkMode.REPLACE );
Pipe assembly = new Pipe( "wordcount" );
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );
assembly = new GroupBy( assembly, new Fields( "word" ) );
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, App.class );
FlowConnector flowConnector = new FlowConnector( properties );
Flow flow = flowConnector.connect( "job1", source, sink, assembly );
return flow;
}
public static void main( String[] args )
{
MyRiffle pr = new MyRiffle(0, "results", "results2", 0);
ProcessFlow<MyRiffle> prFlow = new ProcessFlow<MyRiffle>("job2", pr);
CascadeConnector connector = new CascadeConnector();
Cascade cascade = connector.connect(initializingFlow(), prFlow, getFinalizingJob());
cascade.complete();
}
}
package com.mycompany.app;
import java.io.IOException;
import riffle.process.DependencyIncoming;
import riffle.process.DependencyOutgoing;
import riffle.process.ProcessComplete;
import riffle.process.ProcessStart;
import riffle.process.ProcessStop;
/**
*
*/
@riffle.process.Process
public class MyRiffle
{
String incoming;
String outgoing;
public MyRiffle( int id, String incoming, String outgoing, long delay )
{
this.incoming = incoming;
this.outgoing = outgoing;
}
@DependencyOutgoing
public String getOutgoing()
{
return outgoing;
}
@DependencyIncoming
public String getIncoming()
{
return incoming;
}
@ProcessStart
public void start()
{
}
@ProcessComplete
public void complete()
{
start();
}
@ProcessStop
public void stop()
{
}
@Override
public String toString()
{
return "hi";
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment