import requests
from requests.exceptions import HTTPError, Timeout
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
conf = SparkConf().setAppName(opts.get('app_name'))
sc = SparkContext(conf=conf)
ssc = SQLContext(sc)
rs_query = "select * from my_table limit 10"
rs_tmp_dir = 's3n://path/for/temp/data'
rs_url = 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
# Create spark dataframe through a Redshift query
df = \
.format('com.databricks.spark.redshift') \
.option('url', rs_url) \
.option('query', rs_query) \
.option('tempdir', rs_tmp_dir) \
.option('temporary_aws_access_key_id', sts_credentials.get('AccessKeyId')) \
.option('temporary_aws_secret_access_key', sts_credentials.get('SecretAccessKey')) \
.option('temporary_aws_session_token', sts_credentials.get('Token')) \
def get_redshift_credentials(role=DEFAULT_INSTANCE_ROLE,
""" Returns temp AWS credentials present in an AWS instance
Note: only works on AWS machines!
:param role: (str) AWS instance role
:param local_aws_access_key_id: (str) optional param for local testing/dev
:param local_aws_secret_access_key: (str) optional param for local testing/dev
:param opts: ()
(str) temp credentials to be used in query
if not role:
sts_credentials = get_temp_credentials(role=role) or dict()
aws_access_key_id = local_aws_access_key_id or sts_credentials.get('AccessKeyId')
aws_secret_access_key = local_aws_secret_access_key or sts_credentials.get('SecretAccessKey')
token = sts_credentials.get('Token')
redshift_credentials = 'aws_access_key_id={0};aws_secret_access_key={1}' \
if token and not all((local_aws_access_key_id, local_aws_secret_access_key)):
redshift_credentials = '{0};token={1}'.format(redshift_credentials, token)
return redshift_credentials
def get_temp_credentials(role=DEFAULT_INSTANCE_ROLE):
""" Retrieves temp AWS credentials """
query_uri = '{}'.format(role)
print('Querying AWS for credentials - {}'.format(query_uri))
sts_credentials = requests.get(query_uri).json()
if isinstance(sts_credentials, dict) and \
sts_credentials.get('Code') == 'Success':
print('Successfully retrieved temp AWS credentials.')
return sts_credentials
print('There was a problem when retrieving temp credentials '
'from AWS. Here\'s the response: \n{}'.format(sts_credentials))
except (HTTPError, ConnectionError, Timeout) as err:
msg = 'Unable to query AWS for credentials: {}'.format(err)
except ValueError as err:
msg = 'Error: unable to decode json from \'None\' value - {} ' \
'(hint: most likely the role you are using is wrong)'.format(err)
except Exception as err:
msg = 'Failed to get AWS role temp credentials: {}'.format(err)
