Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
query Athena using boto3
import boto3
import pandas as pd
import io
import re
import time
params = {
'region': 'eu-central-1',
'database': 'databasename',
'bucket': 'your-bucket-name',
'path': 'temp/athena/output',
'query': 'SELECT * FROM tablename LIMIT 100'
}
session = boto3.Session()
def athena_query(client, params):
response = client.start_query_execution(
QueryString=params["query"],
QueryExecutionContext={
'Database': params['database']
},
ResultConfiguration={
'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
}
)
return response
def athena_to_s3(session, params, max_execution = 5):
client = session.client('athena', region_name=params["region"])
execution = athena_query(client, params)
execution_id = execution['QueryExecutionId']
state = 'RUNNING'
while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
max_execution = max_execution - 1
response = client.get_query_execution(QueryExecutionId = execution_id)
if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
if state == 'FAILED':
return False
elif state == 'SUCCEEDED':
s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation']
filename = re.findall('.*\/(.*)', s3_path)[0]
return filename
time.sleep(1)
return False
def s3_to_pandas(session, params, s3_filename):
s3client = session.client('s3')
obj = s3client.get_object(Bucket=params['bucket'],
Key=params['path'] + '/' + s3_filename)
df = pd.read_csv(io.BytesIO(obj['Body'].read()))
return df
# Deletes all files in your path so use carefully!
def cleanup(session, params):
s3 = session.resource('s3')
my_bucket = s3.Bucket(params['bucket'])
for item in my_bucket.objects.filter(Prefix=params['path']):
item.delete()
# Query Athena and get the s3 filename as a result
s3_filename = athena_to_s3(session, params)
# Removes all files from your s3 path, so use carefully
cleanup(session, params)
@NicolasGuary

This comment has been minimized.

Copy link

@NicolasGuary NicolasGuary commented Jul 6, 2020

This code fails when you have state == 'QUEUED'.
I suggest editing line 36 as:

while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):

@ilkkapeltola

This comment has been minimized.

Copy link
Owner Author

@ilkkapeltola ilkkapeltola commented Jul 6, 2020

This code fails when you have state == 'QUEUED'.
I suggest editing line 36 as:

while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):

Good point. Edited. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment