Skip to content

Instantly share code, notes, and snippets.

Created April 11, 2013 15:51
Show Gist options
  • Save anonymous/5364554 to your computer and use it in GitHub Desktop.
Save anonymous/5364554 to your computer and use it in GitHub Desktop.
InputFormat
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
public class LimitedDailyLogFormat {
private Integer limit;
/**
* Set paths for the map reduce job
*
* @param job the job
* @param path root path
* @param limit maximum number of input paths
* @throws IOException
*/
public static void setInputPaths(Job job, String path, Integer limit) throws IOException {
setInputPaths(job, new Path(path), limit);
}
/**
* Set paths for the map reduce job
*
* @param job the job
* @param path root path
* @param limit maximum number of input paths
* @throws IOException
*/
public static void setInputPaths(Job job, Path path, Integer limit) throws IOException {
FileSystem fs = FileSystem.get(job.getConfiguration());
FileStatus fileStatus = fs.getFileStatus(path);
Preconditions.checkArgument(fs.exists(path) && fileStatus.isDirectory(), "path is missing or not a directory: %s", path);
Iterable<Path> paths = Iterables.filter(
recursivePaths(fs.getConf(), path),
new Predicate<Path>() {
Pattern pattern = Pattern.compile(".*\\d{4}/\\d{2}/\\d{2}$");
@Override
public boolean apply(Path input) {
return pattern.matcher(input.toString()).matches();
}
});
paths = Iterables.limit(Ordering.natural().reverse().sortedCopy(paths), limit);
FileInputFormat.setInputPaths(job, Iterables.toArray(paths, Path.class));
}
/**
* Recursively find all paths
*
* @param conf The jobs configuration
* @param path The path to start from
* @return all paths
* @throws IOException
*/
private static List<Path> recursivePaths(Configuration conf, Path path) throws IOException {
List<Path> paths = new ArrayList<Path>();
for (FileStatus status : path.getFileSystem(conf).listStatus(path)) {
if (status.isDirectory()) {
paths.add(status.getPath());
paths.addAll(recursivePaths(conf, status.getPath()));
}
}
return paths;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment