Skip to content

Instantly share code, notes, and snippets.

@williambarretolopes
Forked from sysboss/query_athena.py
Created September 10, 2019 03:29
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save williambarretolopes/0d8d5b97ef7703eed4a6fbf162965571 to your computer and use it in GitHub Desktop.
Save williambarretolopes/0d8d5b97ef7703eed4a6fbf162965571 to your computer and use it in GitHub Desktop.
SQL Query Amazon Athena using Python
#!/usr/bin/env python3
#
# Query AWS Athena using SQL
# Copyright (c) Alexey Baikov <sysboss[at]mail.ru>
#
# This snippet is a basic example to query Athen and load the results
# to a variable.
#
# Requirements:
# > pip3 install boto3 botocore retrying
import os
import sys
import csv
import boto3
import botocore
from retrying import retry
# configuration
s3_bucket = 'athenaoutput' # S3 Bucket name
s3_ouput = 's3://'+ s3_bucket # S3 Bucket to store results
database = 'datalake_database' # The database to which the query belongs
# init clients
athena = boto3.client('athena')
s3 = boto3.resource('s3')
@retry(stop_max_attempt_number = 10,
wait_exponential_multiplier = 300,
wait_exponential_max = 1 * 60 * 1000)
def poll_status(_id):
result = athena.get_query_execution( QueryExecutionId = _id )
state = result['QueryExecution']['Status']['State']
if state == 'SUCCEEDED':
return result
elif state == 'FAILED':
return result
else:
raise Exception
def run_query(query, database, s3_output):
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output,
})
QueryExecutionId = response['QueryExecutionId']
result = poll_status(QueryExecutionId)
if result['QueryExecution']['Status']['State'] == 'SUCCEEDED':
print("Query SUCCEEDED: {}".format(QueryExecutionId))
s3_key = QueryExecutionId + '.csv'
local_filename = QueryExecutionId + '.csv'
# download result file
try:
s3.Bucket(s3_bucket).download_file(s3_key, local_filename)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
else:
raise
# read file to array
rows = []
with open(local_filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
rows.append(row)
# delete result file
if os.path.isfile(local_filename):
os.remove(local_filename)
return rows
if __name__ == '__main__':
# SQL Query to execute
query = ("""
SELECT id, name
FROM example
LIMIT 20
""")
print("Executing query: {}".format(query))
result = run_query(query, database, s3_ouput)
print("Results:")
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment