Last active
January 3, 2023 21:06
-
-
Save ilkkapeltola/929579b9d689809ce81ad93a954d50d3 to your computer and use it in GitHub Desktop.
query Athena using boto3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import pandas as pd | |
import io | |
import re | |
import time | |
params = { | |
'region': 'eu-central-1', | |
'database': 'databasename', | |
'bucket': 'your-bucket-name', | |
'path': 'temp/athena/output', | |
'query': 'SELECT * FROM tablename LIMIT 100' | |
} | |
session = boto3.Session() | |
def athena_query(client, params): | |
response = client.start_query_execution( | |
QueryString=params["query"], | |
QueryExecutionContext={ | |
'Database': params['database'] | |
}, | |
ResultConfiguration={ | |
'OutputLocation': 's3://' + params['bucket'] + '/' + params['path'] | |
} | |
) | |
return response | |
def athena_to_s3(session, params, max_execution = 5): | |
client = session.client('athena', region_name=params["region"]) | |
execution = athena_query(client, params) | |
execution_id = execution['QueryExecutionId'] | |
state = 'RUNNING' | |
while (max_execution > 0 and state in ['RUNNING', 'QUEUED']): | |
max_execution = max_execution - 1 | |
response = client.get_query_execution(QueryExecutionId = execution_id) | |
if 'QueryExecution' in response and \ | |
'Status' in response['QueryExecution'] and \ | |
'State' in response['QueryExecution']['Status']: | |
state = response['QueryExecution']['Status']['State'] | |
if state == 'FAILED': | |
return False | |
elif state == 'SUCCEEDED': | |
s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation'] | |
filename = re.findall('.*\/(.*)', s3_path)[0] | |
return filename | |
time.sleep(1) | |
return False | |
def s3_to_pandas(session, params, s3_filename): | |
s3client = session.client('s3') | |
obj = s3client.get_object(Bucket=params['bucket'], | |
Key=params['path'] + '/' + s3_filename) | |
df = pd.read_csv(io.BytesIO(obj['Body'].read())) | |
return df | |
# Deletes all files in your path so use carefully! | |
def cleanup(session, params): | |
s3 = session.resource('s3') | |
my_bucket = s3.Bucket(params['bucket']) | |
for item in my_bucket.objects.filter(Prefix=params['path']): | |
item.delete() | |
# Query Athena and get the s3 filename as a result | |
s3_filename = athena_to_s3(session, params) | |
# Removes all files from your s3 path, so use carefully | |
cleanup(session, params) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Good point. Edited. Thanks!