Skip to content

Instantly share code, notes, and snippets.

@smartnose
Last active November 18, 2016 23:10
Show Gist options
  • Save smartnose/6489d68e527cb48282f0aa20c4dd5ab2 to your computer and use it in GitHub Desktop.
Save smartnose/6489d68e527cb48282f0aa20c4dd5ab2 to your computer and use it in GitHub Desktop.

Hadoop File Systems Through Code

Here, I'm trying to explain how various file systems (hdfs, s3, emrfs) interacts with hadoop. Understanding this would help address some of the tricky problems arise during development process, e.g. authentication & performance issues. Hadoop file systems nowadays support a variety of applications. Specifically, I'll focus on EMRFS and Spark.

Given a URI (e.g. s3://mybucket/objectname). Spark interacts with hadoop file system API through DataSource.write function.

       val caseInsensitiveOptions = new CaseInsensitiveMap(options)
        val outputPath = {
          val path = new Path(caseInsensitiveOptions.getOrElse("path", {
            throw new IllegalArgumentException("'path' is not specified")
          }))
          val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
        }

Here, Path.getFileSystem enters the realm of hadoop code base, which in turn calls to FileSystem.java. This class looks at the URI, s3://mybucket/objectname in this case, and determine which sub file system to use. The URI schema, i.e. s3 is used as the key to look up the file system subclass type from configuration file:

  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
     Configuration conf) throws IOException {
   if (!FILE_SYSTEMS_LOADED) {
     loadFileSystems();
   }
   Class<? extends FileSystem> clazz = null;
   if (conf != null) {
     clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
   }
   if (clazz == null) {
     clazz = SERVICE_FILE_SYSTEMS.get(scheme);
   }
   if (clazz == null) {
     throw new IOException("No FileSystem for scheme: " + scheme);
   }
   return clazz;
 }

To reduce the cost of looking up and creating file system instances, hadoop used a cache mechanism. Look for the static class Cache in the same file:

/** Caching FileSystem objects */
  static class Cache {

The cache key is based on a number of things:

  1. The scheme, i.e., "s3"
  2. The authority. If the URI has a host/user name such as s3://myuser@mybucket/myobject, then different user name will get a different file system instance even if they have the same scheme.
  3. Local user group information.
  4. An artifical count to force creating a new file system instance if needed.
/** FileSystem.Cache.Key */
    static class Key {
      final String scheme;
      final String authority;
      final UserGroupInformation ugi;
      final long unique;   // an artificial way to make a key unique

      Key(URI uri, Configuration conf) throws IOException {
        this(uri, conf, 0);
      }

      Key(URI uri, Configuration conf, long unique) throws IOException {
        scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
        authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
        this.unique = unique;
        
        this.ugi = UserGroupInformation.getCurrentUser();
      }

      @Override
      public int hashCode() {
        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
      }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment