Skip to content

Instantly share code, notes, and snippets.

@redshiftetl
Last active August 29, 2015 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save redshiftetl/9e6f821e6319cbf49185 to your computer and use it in GitHub Desktop.
Save redshiftetl/9e6f821e6319cbf49185 to your computer and use it in GitHub Desktop.
import java.io.File;
import java.io.IOException;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.operation.Identity;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.local.TextDelimited;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;
public class CascadingTestCase {
public static void main(String[] args) throws IOException {
FlowDef flowDef = new FlowDef();
// Input
File inFile = new File("./input.txt");
FileTap source = new FileTap(new TextDelimited(false, "\t"), inFile.getAbsolutePath());
flowDef.addSource("firstPipe", source);
// First pipe and trap
// Simple pass-through of all records
Pipe firstPipe = new Each(new Pipe("firstPipe"), Fields.ALL, new Identity());
FileTap firstPipeTrapPath = new FileTap(new TextDelimited(false, "\t"), "./goodPath.txt");
flowDef.addTrap("firstPipe", firstPipeTrapPath);
// Second pipe and trap
// An exception is thrown for all tuples that pass through this pipe, but the trap's tap can't be written to
Pipe secondPipe = new Each(new Pipe("secondPipe", firstPipe), Fields.ALL, new ExceptionThrowingFunction());
FileTap secondPipeTrapPath = new FileTap(new TextDelimited(false, "\t"), "/this/path/wont/work.txt");
flowDef.addTrap("secondPipe", secondPipeTrapPath);
// Output
// Tuples won't reach this point because they're all trapped by the previous pipe
File outFile = File.createTempFile("test", ".txt");
FileTap outTap = new FileTap(new TextDelimited(false, "\t"), outFile.getAbsolutePath());
flowDef.addTail(secondPipe);
flowDef.addSink(secondPipe, outTap);
@SuppressWarnings("rawtypes")
Flow flow = new LocalFlowConnector().connect(flowDef);
flow.writeDOT("./steps.dot");
flow.complete();
}
}
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
@SuppressWarnings("rawtypes")
class ExceptionThrowingFunction extends BaseOperation implements Function {
@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
throw new RuntimeException();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment