Last active
August 29, 2015 14:15
-
-
Save redshiftetl/9e6f821e6319cbf49185 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File; | |
import java.io.IOException; | |
import cascading.flow.Flow; | |
import cascading.flow.FlowDef; | |
import cascading.flow.local.LocalFlowConnector; | |
import cascading.operation.Identity; | |
import cascading.pipe.Each; | |
import cascading.pipe.Pipe; | |
import cascading.scheme.local.TextDelimited; | |
import cascading.tap.local.FileTap; | |
import cascading.tuple.Fields; | |
public class CascadingTestCase { | |
public static void main(String[] args) throws IOException { | |
FlowDef flowDef = new FlowDef(); | |
// Input | |
File inFile = new File("./input.txt"); | |
FileTap source = new FileTap(new TextDelimited(false, "\t"), inFile.getAbsolutePath()); | |
flowDef.addSource("firstPipe", source); | |
// First pipe and trap | |
// Simple pass-through of all records | |
Pipe firstPipe = new Each(new Pipe("firstPipe"), Fields.ALL, new Identity()); | |
FileTap firstPipeTrapPath = new FileTap(new TextDelimited(false, "\t"), "./goodPath.txt"); | |
flowDef.addTrap("firstPipe", firstPipeTrapPath); | |
// Second pipe and trap | |
// An exception is thrown for all tuples that pass through this pipe, but the trap's tap can't be written to | |
Pipe secondPipe = new Each(new Pipe("secondPipe", firstPipe), Fields.ALL, new ExceptionThrowingFunction()); | |
FileTap secondPipeTrapPath = new FileTap(new TextDelimited(false, "\t"), "/this/path/wont/work.txt"); | |
flowDef.addTrap("secondPipe", secondPipeTrapPath); | |
// Output | |
// Tuples won't reach this point because they're all trapped by the previous pipe | |
File outFile = File.createTempFile("test", ".txt"); | |
FileTap outTap = new FileTap(new TextDelimited(false, "\t"), outFile.getAbsolutePath()); | |
flowDef.addTail(secondPipe); | |
flowDef.addSink(secondPipe, outTap); | |
@SuppressWarnings("rawtypes") | |
Flow flow = new LocalFlowConnector().connect(flowDef); | |
flow.writeDOT("./steps.dot"); | |
flow.complete(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cascading.flow.FlowProcess; | |
import cascading.operation.BaseOperation; | |
import cascading.operation.Function; | |
import cascading.operation.FunctionCall; | |
@SuppressWarnings("rawtypes") | |
class ExceptionThrowingFunction extends BaseOperation implements Function { | |
@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) { | |
throw new RuntimeException(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
input | |
data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment