Created
April 11, 2013 15:51
-
-
Save anonymous/5364554 to your computer and use it in GitHub Desktop.
InputFormat
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.base.Preconditions; | |
import com.google.common.base.Predicate; | |
import com.google.common.collect.Iterables; | |
import com.google.common.collect.Ordering; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileStatus; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.regex.Pattern; | |
public class LimitedDailyLogFormat { | |
private Integer limit; | |
/** | |
* Set paths for the map reduce job | |
* | |
* @param job the job | |
* @param path root path | |
* @param limit maximum number of input paths | |
* @throws IOException | |
*/ | |
public static void setInputPaths(Job job, String path, Integer limit) throws IOException { | |
setInputPaths(job, new Path(path), limit); | |
} | |
/** | |
* Set paths for the map reduce job | |
* | |
* @param job the job | |
* @param path root path | |
* @param limit maximum number of input paths | |
* @throws IOException | |
*/ | |
public static void setInputPaths(Job job, Path path, Integer limit) throws IOException { | |
FileSystem fs = FileSystem.get(job.getConfiguration()); | |
FileStatus fileStatus = fs.getFileStatus(path); | |
Preconditions.checkArgument(fs.exists(path) && fileStatus.isDirectory(), "path is missing or not a directory: %s", path); | |
Iterable<Path> paths = Iterables.filter( | |
recursivePaths(fs.getConf(), path), | |
new Predicate<Path>() { | |
Pattern pattern = Pattern.compile(".*\\d{4}/\\d{2}/\\d{2}$"); | |
@Override | |
public boolean apply(Path input) { | |
return pattern.matcher(input.toString()).matches(); | |
} | |
}); | |
paths = Iterables.limit(Ordering.natural().reverse().sortedCopy(paths), limit); | |
FileInputFormat.setInputPaths(job, Iterables.toArray(paths, Path.class)); | |
} | |
/** | |
* Recursively find all paths | |
* | |
* @param conf The jobs configuration | |
* @param path The path to start from | |
* @return all paths | |
* @throws IOException | |
*/ | |
private static List<Path> recursivePaths(Configuration conf, Path path) throws IOException { | |
List<Path> paths = new ArrayList<Path>(); | |
for (FileStatus status : path.getFileSystem(conf).listStatus(path)) { | |
if (status.isDirectory()) { | |
paths.add(status.getPath()); | |
paths.addAll(recursivePaths(conf, status.getPath())); | |
} | |
} | |
return paths; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment