Skip to content

Instantly share code, notes, and snippets.

@vrajat
Created December 16, 2014 09:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vrajat/b2acbac3bdbaaa52443e to your computer and use it in GitHub Desktop.
Save vrajat/b2acbac3bdbaaa52443e to your computer and use it in GitHub Desktop.
Optimized S3 Listing in Java using Qubole's jar
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.InputPathProcessor;
import org.apache.hadoop.mapred.JobConf;
import java.util.List;
//Supporting class for listLocatedStatus
class MultiPathFilter implements PathFilter {
protected static final PathFilter hiddenFileFilter = new PathFilter(){
public boolean accept(Path p){
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
};
private List<PathFilter> filters;
public MultiPathFilter(List<PathFilter> filters) {
this.filters = filters;
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
public static PathFilter getHiddenFileFilter() {
return hiddenFileFilter;
}
}
protected FileStatus[] listStatus(Path path) throws IOException {
List<Path> inputPaths = new ArrayList<Path>();
inputPaths.add(path);
LOG.info("Input paths to process:" + inputPaths.size());
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(MultiPathFilter.getHiddenFileFilter());
final PathFilter inputFilter = new MultiPathFilter(filters);
JobConf job = new JobConf();
//Turning on optimization
job.setInt("fs.s3.inputpathprocessor.minsize", 0);
LOG.info("S3 optimization turned ON");
InputPathProcessor ipp = new InputPathProcessor(job, inputFilter, inputPaths);
LOG.info("InputPathProcessor initialized");
long t1 = System.nanoTime();
ipp.compute();
LOG.info("computeLocatedFileStatus took " + (System.nanoTime() - t1) / Math.pow(10, 9));
List<LocatedFileStatus> result = ipp.getLocatedFileStatus();
LOG.info("Total result paths to process : " + result.size());
return result.toArray(new LocatedFileStatus[result.size()]);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment