Skip to content

Instantly share code, notes, and snippets.

@ilkkapeltola
Last active January 3, 2023 21:06
Show Gist options
  • Star 14 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save ilkkapeltola/929579b9d689809ce81ad93a954d50d3 to your computer and use it in GitHub Desktop.
Save ilkkapeltola/929579b9d689809ce81ad93a954d50d3 to your computer and use it in GitHub Desktop.
query Athena using boto3
import boto3
import pandas as pd
import io
import re
import time
params = {
'region': 'eu-central-1',
'database': 'databasename',
'bucket': 'your-bucket-name',
'path': 'temp/athena/output',
'query': 'SELECT * FROM tablename LIMIT 100'
}
session = boto3.Session()
def athena_query(client, params):
response = client.start_query_execution(
QueryString=params["query"],
QueryExecutionContext={
'Database': params['database']
},
ResultConfiguration={
'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
}
)
return response
def athena_to_s3(session, params, max_execution = 5):
client = session.client('athena', region_name=params["region"])
execution = athena_query(client, params)
execution_id = execution['QueryExecutionId']
state = 'RUNNING'
while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
max_execution = max_execution - 1
response = client.get_query_execution(QueryExecutionId = execution_id)
if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
if state == 'FAILED':
return False
elif state == 'SUCCEEDED':
s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation']
filename = re.findall('.*\/(.*)', s3_path)[0]
return filename
time.sleep(1)
return False
def s3_to_pandas(session, params, s3_filename):
s3client = session.client('s3')
obj = s3client.get_object(Bucket=params['bucket'],
Key=params['path'] + '/' + s3_filename)
df = pd.read_csv(io.BytesIO(obj['Body'].read()))
return df
# Deletes all files in your path so use carefully!
def cleanup(session, params):
s3 = session.resource('s3')
my_bucket = s3.Bucket(params['bucket'])
for item in my_bucket.objects.filter(Prefix=params['path']):
item.delete()
# Query Athena and get the s3 filename as a result
s3_filename = athena_to_s3(session, params)
# Removes all files from your s3 path, so use carefully
cleanup(session, params)
@ilkkapeltola
Copy link
Author

This code fails when you have state == 'QUEUED'.
I suggest editing line 36 as:

while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):

Good point. Edited. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment