Skip to content

Instantly share code, notes, and snippets.

@sysboss
Created May 21, 2018 15:41
Show Gist options
  • Save sysboss/d40ea8a7a12f510e61d7980269323b36 to your computer and use it in GitHub Desktop.
Save sysboss/d40ea8a7a12f510e61d7980269323b36 to your computer and use it in GitHub Desktop.
SQL Query Amazon Athena using Python
#!/usr/bin/env python3
#
# Query AWS Athena using SQL
# Copyright (c) Alexey Baikov <sysboss[at]mail.ru>
#
# This snippet is a basic example to query Athen and load the results
# to a variable.
#
# Requirements:
# > pip3 install boto3 botocore retrying
import os
import sys
import csv
import boto3
import botocore
from retrying import retry
# configuration
s3_bucket = 'athenaoutput' # S3 Bucket name
s3_ouput = 's3://'+ s3_bucket # S3 Bucket to store results
database = 'datalake_database' # The database to which the query belongs
# init clients
athena = boto3.client('athena')
s3 = boto3.resource('s3')
@retry(stop_max_attempt_number = 10,
wait_exponential_multiplier = 300,
wait_exponential_max = 1 * 60 * 1000)
def poll_status(_id):
result = athena.get_query_execution( QueryExecutionId = _id )
state = result['QueryExecution']['Status']['State']
if state == 'SUCCEEDED':
return result
elif state == 'FAILED':
return result
else:
raise Exception
def run_query(query, database, s3_output):
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output,
})
QueryExecutionId = response['QueryExecutionId']
result = poll_status(QueryExecutionId)
if result['QueryExecution']['Status']['State'] == 'SUCCEEDED':
print("Query SUCCEEDED: {}".format(QueryExecutionId))
s3_key = QueryExecutionId + '.csv'
local_filename = QueryExecutionId + '.csv'
# download result file
try:
s3.Bucket(s3_bucket).download_file(s3_key, local_filename)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
else:
raise
# read file to array
rows = []
with open(local_filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
rows.append(row)
# delete result file
if os.path.isfile(local_filename):
os.remove(local_filename)
return rows
if __name__ == '__main__':
# SQL Query to execute
query = ("""
SELECT id, name
FROM example
LIMIT 20
""")
print("Executing query: {}".format(query))
result = run_query(query, database, s3_ouput)
print("Results:")
print(result)
@sannithibalaji
Copy link

Hello this code is giving me following error. Please do help me out
botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 1:55: no viable alternative at input '=''

there should be some error at the query which you are passing. can you post the query?

@mayankA47
Copy link

Works perfect! does it also saves the query results on s3, if yes then how to avoid it?

@yfellous
Copy link

thank u :)

@HoangYell
Copy link

We can directly query the result from Athena instead of downloading and reading the result file from S3.

athena_result = athena.get_query_results(QueryExecutionId=query_execution_id)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment