Created
June 17, 2013 19:56
-
-
Save vmarcinko/5799790 to your computer and use it in GitHub Desktop.
Helper utilities for Hadoop job submission
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.*; | |
import org.apache.hadoop.fs.permission.FsPermission; | |
import org.apache.hadoop.mapreduce.JobSubmissionFiles; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.*; | |
import java.lang.reflect.Field; | |
import java.lang.reflect.Modifier; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.Map; | |
import java.util.Set; | |
public class JobHelper { | |
private static final Logger logger = LoggerFactory.getLogger(JobHelper.class); | |
private static Class<?> trackerDistributedCacheManagerClass; | |
static { | |
if (System.getProperty("os.name").toLowerCase().startsWith("win")) { | |
try { | |
trackerDistributedCacheManagerClass = Class.forName("org.apache.hadoop.filecache.TrackerDistributedCacheManager"); | |
} catch (ClassNotFoundException e) { | |
trackerDistributedCacheManagerClass = null; | |
logger.warn("Unable to provide Windows JAR permission fix: " + e.getMessage(), e); | |
} | |
} | |
} | |
public static void hackHadoopStagingOnWin() { | |
// do the assignment only on Windows systems | |
if (System.getProperty("os.name").toLowerCase().startsWith("win")) { | |
// 0655 = -rwxr-xr-x | |
JobSubmissionFiles.JOB_DIR_PERMISSION.fromShort((short) 0650); | |
JobSubmissionFiles.JOB_FILE_PERMISSION.fromShort((short) 0650); | |
if (trackerDistributedCacheManagerClass != null) { | |
// handle jar permissions as well | |
Field field = findField(trackerDistributedCacheManagerClass, "PUBLIC_CACHE_OBJECT_PERM"); | |
makeAccessible(field); | |
try { | |
FsPermission perm = (FsPermission) field.get(null); | |
perm.fromShort((short) 0650); | |
} catch (IllegalAccessException e) { | |
throw new RuntimeException("Error while trying to set permission on field: " + field, e); | |
}; | |
} | |
} | |
} | |
private static Field findField(Class<?> clazz, String name) { | |
Class<?> searchType = clazz; | |
while (!Object.class.equals(searchType) && searchType != null) { | |
Field[] fields = searchType.getDeclaredFields(); | |
for (Field field : fields) { | |
if ((name == null || name.equals(field.getName()))) { | |
return field; | |
} | |
} | |
searchType = searchType.getSuperclass(); | |
} | |
return null; | |
} | |
private static void makeAccessible(Field field) { | |
if ((!Modifier.isPublic(field.getModifiers()) || !Modifier.isPublic(field.getDeclaringClass().getModifiers()) || | |
Modifier.isFinal(field.getModifiers())) && !field.isAccessible()) { | |
field.setAccessible(true); | |
} | |
} | |
public static void copyLocalJarsToHdfs(String localJarsDir, String hdfsJarsDir, Configuration configuration) throws IOException { | |
checkRequiredArgument(localJarsDir, "Local JARs dir is null"); | |
checkRequiredArgument(hdfsJarsDir, "HDFS JARs dir is null"); | |
checkRequiredArgument(configuration, "Configuration is null"); | |
Set<File> jarFiles = collectJarFilesFromLocalDir(localJarsDir); | |
if (jarFiles.isEmpty()) { | |
logger.info("No JAR files found for copying to HDFS under local dir: {}", localJarsDir); | |
} else { | |
logger.info("Copying {} JAR files from local dir ({}) to HDFS dir ({} at {})", jarFiles.size(), localJarsDir, hdfsJarsDir, resolveHdfsAddress(configuration)); | |
FileSystem hdfsFileSystem = FileSystem.get(configuration); | |
for (File jarFile : jarFiles) { | |
Path localJarPath = new Path(jarFile.toURI()); | |
Path hdfsJarPath = new Path(hdfsJarsDir, jarFile.getName()); | |
hdfsFileSystem.copyFromLocalFile(false, true, localJarPath, hdfsJarPath); | |
} | |
} | |
} | |
private static void checkRequiredArgument(Object argument, String errorMessage) { | |
if (argument == null) { | |
throw new IllegalArgumentException(errorMessage); | |
} | |
} | |
private static Set<File> collectJarFilesFromLocalDir(String localJarsDirPath) { | |
File directoryFile = new File(localJarsDirPath); | |
if (null == directoryFile) { | |
throw new IllegalArgumentException("No directory found at local path: " + localJarsDirPath); | |
} | |
if (!directoryFile.isDirectory()) { | |
throw new IllegalArgumentException("Path points to file, not directory: " + localJarsDirPath); | |
} | |
Set<File> jarFiles = new HashSet<File>(); | |
for (File libFile : directoryFile.listFiles()) { | |
if (libFile.exists() && !libFile.isDirectory() && libFile.getName().endsWith(".jar")) { | |
jarFiles.add(libFile); | |
} | |
} | |
return jarFiles; | |
} | |
public static void addHdfsJarsToDistributedCache(String hdfsJarsDir, Configuration configuration) throws IOException { | |
checkRequiredArgument(hdfsJarsDir, "HDFS JARs dir is null"); | |
checkRequiredArgument(configuration, "Configuration is null"); | |
Set<Path> jarPaths = collectJarPathsOnHdfs(hdfsJarsDir, configuration); | |
if (!jarPaths.isEmpty()) { | |
logger.info("Adding following JARs to distributed cache: {}", jarPaths); | |
System.setProperty("path.separator", ":"); // due to https://issues.apache.org/jira/browse/HADOOP-9123 | |
for (Path jarPath : jarPaths) { | |
FileSystem jarPathFileSystem = jarPath.getFileSystem(configuration); | |
DistributedCache.addFileToClassPath(jarPath, configuration, jarPathFileSystem); | |
} | |
} | |
} | |
private static Set<Path> collectJarPathsOnHdfs(String hdfsJarsDir, Configuration configuration) throws IOException { | |
Set<Path> jarPaths = new HashSet<Path>(); | |
FileSystem fileSystem = FileSystem.get(configuration); | |
Path jarsDirPath = new Path(hdfsJarsDir); | |
if (!fileSystem.exists(jarsDirPath)) { | |
throw new IllegalArgumentException("Directory '" + hdfsJarsDir + "' doesn't exist on HDFS (" + resolveHdfsAddress(configuration) + ")"); | |
} | |
if (fileSystem.isFile(jarsDirPath)) { | |
throw new IllegalArgumentException("Path '" + hdfsJarsDir + "' on HDFS (" + resolveHdfsAddress(configuration) + ") is file, not directory"); | |
} | |
FileStatus[] fileStatuses = fileSystem.listStatus(jarsDirPath); | |
for (FileStatus fileStatus : fileStatuses) { | |
if (!fileStatus.isDir()) { | |
jarPaths.add(fileStatus.getPath()); | |
} | |
} | |
return jarPaths; | |
} | |
private static String resolveHdfsAddress(Configuration configuration) { | |
return configuration.get("fs.default.name"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment