The EMR File System (EMRFS) is an implementation of HDFS that all Amazon EMR clusters use for reading and writing regular files from Amazon EMR directly to Amazon S3.
Coming from HDFS it is very easy to implement EMRFS.
You just need to pass URI("s3://<bucket-name>")
object while getting filesystem object.
package com.joe;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Scanner;
/*
export CLASSPATH=/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/sbin:/opt/aws/bin:/etc/hadoop/conf:/usr/lib/hadoop/*:/usr/lib/hadoop/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/lib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/lib/spark/yarn/lib/datanucleus-api-jdo.jar:/usr/lib/spark/yarn/lib/datanucleus-core.jar:/usr/lib/spark/yarn/lib/datanucleus-rdbms.jar:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/lib/hadoop-mapreduce/share/hadoop/mapreduce/*:/usr/lib/hadoop-mapreduce/share/hadoop/mapreduce/lib/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/lib/*:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar:/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar:/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/aws-java-sdk/*
java -cp emrfs-test-1.0-SNAPSHOT.jar:$CLASSPATH com.joe.SimpleS3Operation "s3://<bucket>/" "s3://<bucket>/<text-file-object-path>"
*/
public class SimpleS3Operation {
public static void main(String[] args) throws IOException, URISyntaxException {
String s3BucketUri=args[0]; // s3://<bucket>
String s3ObjectPath=args[1]; // /<text-file-object-path>
new SimpleS3Operation().readFromS3(s3BucketUri, s3ObjectPath);
}
private void readFromS3(String s3BucketUri, String s3ObjectPath) throws IOException, URISyntaxException{
Configuration conf = new Configuration();
URI s3uri = new URI(s3BucketUri);
FileSystem fs = FileSystem.get(s3uri, conf);
Path inFile = new Path(s3ObjectPath);
FSDataInputStream in = fs.open(inFile);
Scanner sc = new Scanner(in);
StringBuffer outputString = new StringBuffer();
while(sc.hasNextLine()){
outputString.append(sc.nextLine());
}
System.out.println(outputString);
}
}
There no such configuration change in your spark application.
Make sure that the required .jar
in added to your PATH.
package com.joe;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
spark-submit --name "S3 Read using spark" --master yarn --deploy-mode cluster --class "com.joe.SimpleSparkOperation" "emrfs-test-1.0-SNAPSHOT.jar" "s3://<bucket>/<text-file-object-path>"
*/
public class SimpleSparkOperation {
private static Logger LOGGER = LoggerFactory.getLogger(SimpleSparkOperation.class);
public static void main(String[] args) {
String s3path = args[0]; // s3://<bucket>/<text-file-object-path>
new SimpleSparkOperation().readFromS3(s3path);
}
private void readFromS3(String s3Path){
SparkSession spark = getSparkSession();
Dataset<Row> file = spark.read().text(s3Path);
for(Row line: file.collectAsList()){
System.out.println(line.toString()+"\n"); //printing in stdout
LOGGER.info(line.toString()+"\n"); //printing in stderr
}
}
private SparkSession getSparkSession(){
return SparkSession.builder().appName("Simple S3 read").getOrCreate();
}
}
You probably familiar with AWS S3 CLI.
But using Hadoop FileSystem API you can do lots of other thing as well. It uses EMRFS to read and write to S3. So it is more compatible with Consistent View.
A quick guide how your new CLI command will look like:
# Listing objects
hdfs dfs -ls s3://<bucket>/<object-path>
# Copy from local to s3
hdfs dfs -copyFromLocal <local-file> s3://<bucket>/<object-path>
# Copy to local from s3
hdfs dfs -copyToLocal s3://<bucket>/<object-path> <local-file>
As @Teepika mentioned in a blog -
EMRFS is not a separate file-system, it’s an implementation of HDFS that adds strong consistency to S3, whereas S3 without EMRFS implementation provides read after write consistency for PUTS of new objects and eventual consistency for other operations on objects in all regions.
S3 has a caveat with its read-after-write consistency model that if you make a HEAD or GET request to a key name before the object is created, then create the object shortly after that, a subsequent GET might not return the object due to eventual consistency.
If you fire a hive query to insert data into some s3 location, hive creates a staging directory .hive-staging_*
under that location.
And at the end it renames the staging dir to the actual one.
So when hive PUT an object to S3, instantly its't available for HEAD or GET. Sometimes the operation happens so fast, due to s3 consistency query failed intermittently.
After you enable consistency view in EMR cluster, it creates a DynamoDb table to keep track of S3 metadata. Basically it's a key-value map for each and every object in S3.
After you enable this, if you do any PUT operation on a object using aws-s3-sdk or using boto3(aws s3 cli), it won't be in sync. Now whenever you try to perform some operation on the same object, it will throw a ConsistencyException. Because DynamoDb is not in sync.
And if you did the same-thing by mistake, you need to manually sync using -
emrfs sync s3://<bucket>/<path or path-to-object>
Or to see the difference using emrfs diff s3://<bucket>/<path or path-to-object>
Its better to forgot about boto3 or aws-s3-sdk when you are working with EMRFS consistency view.
It comes with its own cost for DynamoDb.