Skip to content

Instantly share code, notes, and snippets.

@asmaier
Last active March 5, 2021 03:43
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save asmaier/5768c7cda3620901440a62248614bbd0 to your computer and use it in GitHub Desktop.
Save asmaier/5768c7cda3620901440a62248614bbd0 to your computer and use it in GitHub Desktop.
Pyspark script for downloading a single parquet file from Amazon S3 via the s3a protocol. It also reads the credentials from the "~/.aws/credentials", so we don't need to hardcode them. See also https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html .
#
# Some constants
#
aws_profile = "your_profile"
aws_region = "your_region"
s3_bucket = "your_bucket"
#
# Reading environment variables from aws credential file
#
import os
import configparser
config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
access_id = config.get(aws_profile, "aws_access_key_id")
access_key = config.get(aws_profile, "aws_secret_access_key")
#
# Configuring pyspark
#
# see https://github.com/jupyter/docker-stacks/issues/127#issuecomment-214594895
# and https://github.com/radanalyticsio/pyspark-s3-notebook/blob/master/s3-source-example.ipynb
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell"
# If this doesn't work you might have to delete your ~/.ivy2 directory to reset your package cache.
# (see https://github.com/databricks/spark-redshift/issues/244#issuecomment-239950148)
import pyspark
sc=pyspark.SparkContext()
# see https://github.com/databricks/spark-redshift/issues/298#issuecomment-271834485
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
# see https://stackoverflow.com/questions/28844631/how-to-set-hadoop-configuration-values-from-pyspark
hadoop_conf=sc._jsc.hadoopConfiguration()
# see https://stackoverflow.com/questions/43454117/how-do-you-use-s3a-with-spark-2-1-0-on-aws-us-east-2
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.access.key", access_id)
hadoop_conf.set("fs.s3a.secret.key", access_key)
# see http://blog.encomiabile.it/2015/10/29/apache-spark-amazon-s3-and-apache-mesos/
hadoop_conf.set("fs.s3a.connection.maximum", "100000")
# see https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
hadoop_conf.set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")
#
# Downloading the parquet file
#
sql=pyspark.sql.SparkSession(sc)
path = s3_bucket + "your_path"
dataS3=sql.read.parquet("s3a://" + path)
@asmaier
Copy link
Author

asmaier commented Sep 8, 2017

Before using this for downloading multiple files in parallel from Amazon S3 read this: https://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219

@arpanraaj
Copy link

Hello Techmates,
I have created my aws free account and uploaded a weather file in a bucket (region:: sa-east-1 :: South America).
Afterwards, I have been trying to read a file from AWS S3 bucket by pyspark as below::
from pyspark import SparkConf, SparkContext
ak=''
sk='
'
sc._jsc.hadoopConfiguration().set("fs.s3.impl","org.apache.hadoop.fs.s3.S3FileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId",ak)
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",sk)
a=sc.textFile("s3://bucket_name/weatherhistory.txt");
a.collect()
But it is showing :: /weatherhistory.txt does not exists.

But, when am trying the same using python (boto3), I can easily read the file.
import boto
import boto.s3.connection
access_key = ''
secret_key = '
'
conn = boto.connect_s3(bucket_name,
aws_access_key_id = access_key,
aws_secret_access_key = secret_key)
.....
.....
Even have listed the keys spark-default.conf as well
[default]
aws_access_key_id=*****
aws_secret_access_key=*****
But, still the error is appearing as :: /weatherhistory.txt does not exists.

have tried this approach as well but the error is same.
conf = (SparkConf()
.setAppName("S3 Configuration Test")
.set("spark.executor.instances", "1")
.set("spark.executor.cores", 1)
.set("spark.executor.memory", "2g")
.set("fs.s3.awsAccessKeyId", "")
.set("fs.s3.awsSecretAccessKey", "
")
.set("fs.s3.endpoint", "s3-sa-east-1.amazonaws.com")
.set("com.amazonaws.services.s3.enableV4", "true")
.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem"))
sc.conf=conf
a=sc.textFile("s3://bucketname/weatherhistory.txt")

Even have tried to write a file thinking that my directory pointing was not correct and if the file write is successful, could pin point the path where it is pointing now but still no progress and say no path exists.

If you please could guide us in this regard, it would really be helpful. Thanks in advance.

@asmaier
Copy link
Author

asmaier commented Jun 21, 2019

Try to use "s3a" and not "s3" .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment