Skip to content

Instantly share code, notes, and snippets.

@cwensel
Last active August 29, 2015 14:04
Show Gist options
  • Save cwensel/9102c10d24823b6ba3b5 to your computer and use it in GitHub Desktop.
Save cwensel/9102c10d24823b6ba3b5 to your computer and use it in GitHub Desktop.
A prototype fluent API for Cascading
// Factories for all Operations (Functions, Filters, Aggregators, and Buffers)
Function splitter = Fluid.function()
.RegexSplitter()
.fieldDeclaration( fields( "num", "char" ) )
.patternString( " " )
.end();
// An assembly builder chaining Pipes into complex assemblies
AssemblyBuilder.Start assembly = Fluid.assembly();
Pipe pipeLower = assembly
.startBranch( "lower" )
.each( fields( "line" ) ).function( splitter ).outgoing( Fields.RESULTS )
.completeBranch();
Pipe pipeUpper = assembly
.startBranch( "upper" )
.each( fields( "line" ) ).function( splitter ).outgoing( Fields.RESULTS )
.completeBranch();
Pipe coGroup = assembly
.startCoGroup()
.lhs( pipeLower ).lhsGroupFields( fields( "num" ) )
.rhs( pipeUpper ).rhsGroupFields( fields( "num" ) )
.declaredFields( fields( "num1", "char1", "num2", "char2" ) )
.joiner( new OuterJoin() )
.createCoGroup();
assembly
.continueBranch( "result", coGroup )
.retain( fields( "num1", "char1" ) )
.rename( Fields.ALL, fields( "num", "char" ) )
.completeBranch();
Pipe[] tails = assembly.completeAssembly();
FlowDef flowDef = flowDef()
.addSource( "lower", sourceLower )
.addSource( "upper", sourceUpper )
.addSink( "result", sink )
.addTails( tails );
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment