Last active
July 17, 2024 04:31
-
-
Save ilkkapeltola/929579b9d689809ce81ad93a954d50d3 to your computer and use it in GitHub Desktop.
query Athena using boto3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import pandas as pd | |
import io | |
import re | |
import time | |
params = { | |
'region': 'eu-central-1', | |
'database': 'databasename', | |
'bucket': 'your-bucket-name', | |
'path': 'temp/athena/output', | |
'query': 'SELECT * FROM tablename LIMIT 100' | |
} | |
session = boto3.Session() | |
def athena_query(client, params): | |
response = client.start_query_execution( | |
QueryString=params["query"], | |
QueryExecutionContext={ | |
'Database': params['database'] | |
}, | |
ResultConfiguration={ | |
'OutputLocation': 's3://' + params['bucket'] + '/' + params['path'] | |
} | |
) | |
return response | |
def athena_to_s3(session, params, max_execution = 5): | |
client = session.client('athena', region_name=params["region"]) | |
execution = athena_query(client, params) | |
execution_id = execution['QueryExecutionId'] | |
state = 'RUNNING' | |
while (max_execution > 0 and state in ['RUNNING', 'QUEUED']): | |
max_execution = max_execution - 1 | |
response = client.get_query_execution(QueryExecutionId = execution_id) | |
if 'QueryExecution' in response and \ | |
'Status' in response['QueryExecution'] and \ | |
'State' in response['QueryExecution']['Status']: | |
state = response['QueryExecution']['Status']['State'] | |
if state == 'FAILED': | |
return False | |
elif state == 'SUCCEEDED': | |
s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation'] | |
filename = re.findall('.*\/(.*)', s3_path)[0] | |
return filename | |
time.sleep(1) | |
return False | |
def s3_to_pandas(session, params, s3_filename): | |
s3client = session.client('s3') | |
obj = s3client.get_object(Bucket=params['bucket'], | |
Key=params['path'] + '/' + s3_filename) | |
df = pd.read_csv(io.BytesIO(obj['Body'].read())) | |
return df | |
# Deletes all files in your path so use carefully! | |
def cleanup(session, params): | |
s3 = session.resource('s3') | |
my_bucket = s3.Bucket(params['bucket']) | |
for item in my_bucket.objects.filter(Prefix=params['path']): | |
item.delete() | |
# Query Athena and get the s3 filename as a result | |
s3_filename = athena_to_s3(session, params) | |
# Removes all files from your s3 path, so use carefully | |
cleanup(session, params) |
This code fails when you have
state == 'QUEUED'
.
I suggest editing line 36 as:
while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
Good point. Edited. Thanks!
You have not added creds and hence it throws an error
NoCredentialsError: Unable to locate credentials
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This code fails when you have
state == 'QUEUED'
.I suggest editing line 36 as:
while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):