Skip to content

Instantly share code, notes, and snippets.

@snigdhasjg
Last active April 30, 2021 11:34
Show Gist options
  • Save snigdhasjg/e509689a53e598a4a52448dc5cd66c9d to your computer and use it in GitHub Desktop.
Save snigdhasjg/e509689a53e598a4a52448dc5cd66c9d to your computer and use it in GitHub Desktop.
Getting started with EMRFS.

Getting started with EMRFS

The EMR File System (EMRFS) is an implementation of HDFS that all Amazon EMR clusters use for reading and writing regular files from Amazon EMR directly to Amazon S3.

How to access a file from S3 using EMRFS

Using Java

Coming from HDFS it is very easy to implement EMRFS. You just need to pass URI("s3://<bucket-name>") object while getting filesystem object.

package com.joe;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Scanner;

/*
export CLASSPATH=/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/sbin:/opt/aws/bin:/etc/hadoop/conf:/usr/lib/hadoop/*:/usr/lib/hadoop/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/lib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/lib/spark/yarn/lib/datanucleus-api-jdo.jar:/usr/lib/spark/yarn/lib/datanucleus-core.jar:/usr/lib/spark/yarn/lib/datanucleus-rdbms.jar:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/lib/hadoop-mapreduce/share/hadoop/mapreduce/*:/usr/lib/hadoop-mapreduce/share/hadoop/mapreduce/lib/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/lib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/aws-java-sdk/*
 java -cp emrfs-test-1.0-SNAPSHOT.jar:$CLASSPATH com.joe.SimpleS3Operation "s3://<bucket>/" "s3://<bucket>/<text-file-object-path>"
*/
public class SimpleS3Operation {
    public static void main(String[] args) throws IOException, URISyntaxException {
        String s3BucketUri=args[0]; // s3://<bucket>
        String s3ObjectPath=args[1]; // /<text-file-object-path>
        new SimpleS3Operation().readFromS3(s3BucketUri, s3ObjectPath);
    }

    private void readFromS3(String s3BucketUri, String s3ObjectPath) throws IOException, URISyntaxException{
        Configuration conf = new Configuration();
        URI s3uri = new URI(s3BucketUri);
        FileSystem fs = FileSystem.get(s3uri, conf);
        Path inFile = new Path(s3ObjectPath);
        FSDataInputStream in = fs.open(inFile);
        Scanner sc = new Scanner(in);
        StringBuffer outputString = new StringBuffer();
        while(sc.hasNextLine()){
            outputString.append(sc.nextLine());
        }
        System.out.println(outputString);
    }
}

Using Java-Spark

There no such configuration change in your spark application. Make sure that the required .jar in added to your PATH.

package com.joe;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
spark-submit --name "S3 Read using spark" --master yarn --deploy-mode cluster --class "com.joe.SimpleSparkOperation" "emrfs-test-1.0-SNAPSHOT.jar" "s3://<bucket>/<text-file-object-path>"
*/
public class SimpleSparkOperation {
    private static Logger LOGGER = LoggerFactory.getLogger(SimpleSparkOperation.class);

    public static void main(String[] args) {
        String s3path = args[0]; // s3://<bucket>/<text-file-object-path>
        new SimpleSparkOperation().readFromS3(s3path);
    }

    private void readFromS3(String s3Path){
        SparkSession spark = getSparkSession();
        Dataset<Row> file = spark.read().text(s3Path);
        for(Row line: file.collectAsList()){
            System.out.println(line.toString()+"\n"); //printing in stdout
            LOGGER.info(line.toString()+"\n"); //printing in stderr
        }
    }

    private SparkSession getSparkSession(){
        return SparkSession.builder().appName("Simple S3 read").getOrCreate();
    }
}

Using CLI

You probably familiar with AWS S3 CLI.

But using Hadoop FileSystem API you can do lots of other thing as well. It uses EMRFS to read and write to S3. So it is more compatible with Consistent View.

A quick guide how your new CLI command will look like:

# Listing objects
hdfs dfs -ls s3://<bucket>/<object-path>

# Copy from local to s3
hdfs dfs -copyFromLocal <local-file> s3://<bucket>/<object-path>

# Copy to local from s3
hdfs dfs -copyToLocal s3://<bucket>/<object-path> <local-file>

What other challenges you will face with EMRFS

As @Teepika mentioned in a blog -

EMRFS is not a separate file-system, it’s an implementation of HDFS that adds strong consistency to S3, whereas S3 without EMRFS implementation provides read after write consistency for PUTS of new objects and eventual consistency for other operations on objects in all regions.

If you not enable consistency view

S3 has a caveat with its read-after-write consistency model that if you make a HEAD or GET request to a key name before the object is created, then create the object shortly after that, a subsequent GET might not return the object due to eventual consistency.

If you fire a hive query to insert data into some s3 location, hive creates a staging directory .hive-staging_* under that location. And at the end it renames the staging dir to the actual one.

So when hive PUT an object to S3, instantly its't available for HEAD or GET. Sometimes the operation happens so fast, due to s3 consistency query failed intermittently.

If you enable consistency view

After you enable consistency view in EMR cluster, it creates a DynamoDb table to keep track of S3 metadata. Basically it's a key-value map for each and every object in S3.

After you enable this, if you do any PUT operation on a object using aws-s3-sdk or using boto3(aws s3 cli), it won't be in sync. Now whenever you try to perform some operation on the same object, it will throw a ConsistencyException. Because DynamoDb is not in sync.

And if you did the same-thing by mistake, you need to manually sync using - emrfs sync s3://<bucket>/<path or path-to-object> Or to see the difference using emrfs diff s3://<bucket>/<path or path-to-object>

Its better to forgot about boto3 or aws-s3-sdk when you are working with EMRFS consistency view.

It comes with its own cost for DynamoDb.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment