Here, I'm trying to explain how various file systems (hdfs, s3, emrfs) interacts with hadoop. Understanding this would help address some of the tricky problems arise during development process, e.g. authentication & performance issues. Hadoop file systems nowadays support a variety of applications. Specifically, I'll focus on EMRFS and Spark.
Given a URI (e.g. s3://mybucket/objectname). Spark interacts with hadoop file system API through DataSource.write function.
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val outputPath = {
val path = new Path(caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
}))
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
Here, Path.getFileSystem enters the realm of hadoop code base, which in turn calls to FileSystem.java. This class looks at the URI, s3://mybucket/objectname in this case, and determine which sub file system to use. The URI schema, i.e. s3 is used as the key to look up the file system subclass type from configuration file:
public static Class<? extends FileSystem> getFileSystemClass(String scheme,
Configuration conf) throws IOException {
if (!FILE_SYSTEMS_LOADED) {
loadFileSystems();
}
Class<? extends FileSystem> clazz = null;
if (conf != null) {
clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
}
if (clazz == null) {
clazz = SERVICE_FILE_SYSTEMS.get(scheme);
}
if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + scheme);
}
return clazz;
}
To reduce the cost of looking up and creating file system instances, hadoop used a cache mechanism. Look for the static class Cache in the same file:
/** Caching FileSystem objects */
static class Cache {
The cache key is based on a number of things:
- The scheme, i.e., "s3"
- The authority. If the URI has a host/user name such as s3://myuser@mybucket/myobject, then different user name will get a different file system instance even if they have the same scheme.
- Local user group information.
- An artifical count to force creating a new file system instance if needed.
/** FileSystem.Cache.Key */
static class Key {
final String scheme;
final String authority;
final UserGroupInformation ugi;
final long unique; // an artificial way to make a key unique
Key(URI uri, Configuration conf) throws IOException {
this(uri, conf, 0);
}
Key(URI uri, Configuration conf, long unique) throws IOException {
scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
this.unique = unique;
this.ugi = UserGroupInformation.getCurrentUser();
}
@Override
public int hashCode() {
return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
}