Skip to content

Instantly share code, notes, and snippets.

@llasram
Created February 6, 2012 20:19
Show Gist options
  • Save llasram/1754564 to your computer and use it in GitHub Desktop.
Save llasram/1754564 to your computer and use it in GitHub Desktop.
Filtering Cascading sink tap
package com.damballa.cascading;
import java.io.IOException;
import cascading.scheme.Scheme;
import cascading.tap.Tap;
import cascading.tap.SinkTap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
public class FilterSinkTap extends SinkTap {
private Tap tap;
private Fields filterFields;
private String tempPath = "__filtersink_placeholder"
+ Integer.toString((int) (System.currentTimeMillis() * Math.random()));
public
FilterSinkTap(Tap tap, Fields filterFields) {
super(tap.getScheme(), tap.getSinkMode());
this.tap = tap;
this.filterFields = filterFields;
}
@Override
public boolean
isWriteDirect() {
return tap.isWriteDirect();
}
@Override
public Path
getPath() {
return tap.getPath();
}
@Override
public TupleEntryCollector
openForWrite(JobConf conf) throws IOException {
return tap.openForWrite(conf);
}
@Override
public void
sinkInit(JobConf conf) throws IOException {
tap.sinkInit(conf);
}
@Override
public boolean
makeDirs(JobConf conf) throws IOException {
return tap.makeDirs(conf);
}
@Override
public boolean
deletePath(JobConf conf) throws IOException {
return tap.deletePath(conf);
}
@Override
public boolean
pathExists(JobConf conf) throws IOException {
return tap.pathExists(conf);
}
@Override
public long
getPathModified(JobConf conf) throws IOException {
return tap.getPathModified(conf);
}
@Override
public String
toString() {
return "FilterSinkTap[" + tap + ',' + filterFields + ']';
}
@Override
public boolean
equals(Object o) {
if (this == o)
return true;
if (!(o instanceof FilterSinkTap))
return false;
if (!super.equals(o))
return false;
FilterSinkTap that = (FilterSinkTap) o;
if (tap.equals(that.tap))
return false;
return true;
}
@Override
public int
hashCode() {
int result = super.hashCode();
result = 31 * result + tap.hashCode();
return result;
}
@Override
public void
sink(TupleEntry tupleEntry, OutputCollector outputCollector)
throws IOException {
if (isKeep(tupleEntry)) {
tap.sink(tupleEntry, outputCollector);
}
}
private boolean
isKeep(TupleEntry tupleEntry) {
Tuple values = tupleEntry.selectTuple(filterFields);
for (Object x: values) {
if (x == null || x == Boolean.FALSE)
return false;
}
return true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment