query Athena using boto3
import boto3
import pandas as pd
import io
import re
import time
params = {
'region': 'eu-central-1',
'database': 'databasename',
'bucket': 'your-bucket-name',
'path': 'temp/athena/output',
'query': 'SELECT * FROM tablename LIMIT 100'
session = boto3.Session(profile_name='profile-optional')
def athena_query(client, params):
response = client.start_query_execution(
'Database': params['database']
'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
return response
def athena_to_s3(session, params, max_execution = 5):
client = session.client('athena', region_name=params["region"])
execution = athena_query(client, params)
execution_id = execution['QueryExecutionId']
state = 'RUNNING'
while (max_execution > 0 and state in ['RUNNING']):
max_execution = max_execution - 1
response = client.get_query_execution(QueryExecutionId = execution_id)
if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
if state == 'FAILED':
return False
elif state == 'SUCCEEDED':
s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation']
filename = re.findall('.*\/(.*)', s3_path)[0]
return filename
return False
def s3_to_pandas(session, params, s3_filename):
s3client = session.client('s3')
obj = s3client.get_object(Bucket=params['bucket'],
Key=params['path'] + '/' + s3_filename)
df = pd.read_csv(io.BytesIO(obj['Body'].read()))
return df
# Deletes all files in your path so use carefully!
def cleanup(session, params):
s3 = session.resource('s3')
my_bucket = s3.Bucket(params['bucket'])
for item in my_bucket.objects.filter(Prefix=params['path']):
# Query Athena and get the s3 filename as a result
s3_filename = athena_to_s3(session, params)
# Removes all files from your s3 path, so use carefully
cleanup(session, params)
