Instantly share code, notes, and snippets.

Embed
What would you like to do?
import requests
from requests.exceptions import HTTPError, Timeout
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
DEFAULT_INSTANCE_ROLE = 'STAGING_EMR_CLUSTER_ROLE'
conf = SparkConf().setAppName(opts.get('app_name'))
sc = SparkContext(conf=conf)
ssc = SQLContext(sc)
rs_query = "select * from my_table limit 10"
rs_tmp_dir = 's3n://path/for/temp/data'
rs_url = 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
# Create spark dataframe through a Redshift query
df = ssc.read \
.format('com.databricks.spark.redshift') \
.option('url', rs_url) \
.option('query', rs_query) \
.option('tempdir', rs_tmp_dir) \
.option('temporary_aws_access_key_id', sts_credentials.get('AccessKeyId')) \
.option('temporary_aws_secret_access_key', sts_credentials.get('SecretAccessKey')) \
.option('temporary_aws_session_token', sts_credentials.get('Token')) \
.load()
def get_redshift_credentials(role=DEFAULT_INSTANCE_ROLE,
local_aws_access_key_id=None,
local_aws_secret_access_key=None,
):
""" Returns temp AWS credentials present in an AWS instance
Note: only works on AWS machines!
:param role: (str) AWS instance role
:param local_aws_access_key_id: (str) optional param for local testing/dev
:param local_aws_secret_access_key: (str) optional param for local testing/dev
:param opts: ()
:return:
(str) temp credentials to be used in query
"""
if not role:
role = DEFAULT_INSTANCE_ROLE
sts_credentials = get_temp_credentials(role=role) or dict()
aws_access_key_id = local_aws_access_key_id or sts_credentials.get('AccessKeyId')
aws_secret_access_key = local_aws_secret_access_key or sts_credentials.get('SecretAccessKey')
token = sts_credentials.get('Token')
redshift_credentials = 'aws_access_key_id={0};aws_secret_access_key={1}' \
.format(aws_access_key_id,aws_secret_access_key,token)
if token and not all((local_aws_access_key_id, local_aws_secret_access_key)):
redshift_credentials = '{0};token={1}'.format(redshift_credentials, token)
return redshift_credentials
def get_temp_credentials(role=DEFAULT_INSTANCE_ROLE):
""" Retrieves temp AWS credentials """
query_uri = 'http://169.254.169.254/latest/meta-data/iam/security-credentials/{}'.format(role)
print('Querying AWS for credentials - {}'.format(query_uri))
try:
sts_credentials = requests.get(query_uri).json()
if isinstance(sts_credentials, dict) and \
sts_credentials.get('Code') == 'Success':
print('Successfully retrieved temp AWS credentials.')
return sts_credentials
print('There was a problem when retrieving temp credentials '
'from AWS. Here\'s the response: \n{}'.format(sts_credentials))
except (HTTPError, ConnectionError, Timeout) as err:
msg = 'Unable to query AWS for credentials: {}'.format(err)
print(msg)
except ValueError as err:
msg = 'Error: unable to decode json from \'None\' value - {} ' \
'(hint: most likely the role you are using is wrong)'.format(err)
print(msg)
except Exception as err:
msg = 'Failed to get AWS role temp credentials: {}'.format(err)
print(msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment