Created
February 6, 2012 20:19
-
-
Save llasram/1754564 to your computer and use it in GitHub Desktop.
Filtering Cascading sink tap
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.damballa.cascading; | |
import java.io.IOException; | |
import cascading.scheme.Scheme; | |
import cascading.tap.Tap; | |
import cascading.tap.SinkTap; | |
import cascading.tuple.Fields; | |
import cascading.tuple.Tuple; | |
import cascading.tuple.TupleEntry; | |
import cascading.tuple.TupleEntryCollector; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.OutputCollector; | |
public class FilterSinkTap extends SinkTap { | |
private Tap tap; | |
private Fields filterFields; | |
private String tempPath = "__filtersink_placeholder" | |
+ Integer.toString((int) (System.currentTimeMillis() * Math.random())); | |
public | |
FilterSinkTap(Tap tap, Fields filterFields) { | |
super(tap.getScheme(), tap.getSinkMode()); | |
this.tap = tap; | |
this.filterFields = filterFields; | |
} | |
@Override | |
public boolean | |
isWriteDirect() { | |
return tap.isWriteDirect(); | |
} | |
@Override | |
public Path | |
getPath() { | |
return tap.getPath(); | |
} | |
@Override | |
public TupleEntryCollector | |
openForWrite(JobConf conf) throws IOException { | |
return tap.openForWrite(conf); | |
} | |
@Override | |
public void | |
sinkInit(JobConf conf) throws IOException { | |
tap.sinkInit(conf); | |
} | |
@Override | |
public boolean | |
makeDirs(JobConf conf) throws IOException { | |
return tap.makeDirs(conf); | |
} | |
@Override | |
public boolean | |
deletePath(JobConf conf) throws IOException { | |
return tap.deletePath(conf); | |
} | |
@Override | |
public boolean | |
pathExists(JobConf conf) throws IOException { | |
return tap.pathExists(conf); | |
} | |
@Override | |
public long | |
getPathModified(JobConf conf) throws IOException { | |
return tap.getPathModified(conf); | |
} | |
@Override | |
public String | |
toString() { | |
return "FilterSinkTap[" + tap + ',' + filterFields + ']'; | |
} | |
@Override | |
public boolean | |
equals(Object o) { | |
if (this == o) | |
return true; | |
if (!(o instanceof FilterSinkTap)) | |
return false; | |
if (!super.equals(o)) | |
return false; | |
FilterSinkTap that = (FilterSinkTap) o; | |
if (tap.equals(that.tap)) | |
return false; | |
return true; | |
} | |
@Override | |
public int | |
hashCode() { | |
int result = super.hashCode(); | |
result = 31 * result + tap.hashCode(); | |
return result; | |
} | |
@Override | |
public void | |
sink(TupleEntry tupleEntry, OutputCollector outputCollector) | |
throws IOException { | |
if (isKeep(tupleEntry)) { | |
tap.sink(tupleEntry, outputCollector); | |
} | |
} | |
private boolean | |
isKeep(TupleEntry tupleEntry) { | |
Tuple values = tupleEntry.selectTuple(filterFields); | |
for (Object x: values) { | |
if (x == null || x == Boolean.FALSE) | |
return false; | |
} | |
return true; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment