Skip to content

Instantly share code, notes, and snippets.

@thomasvincent
Last active June 12, 2024 22:07
Show Gist options
  • Save thomasvincent/0880d8d38e596d5442daaa58e9cfff74 to your computer and use it in GitHub Desktop.
Save thomasvincent/0880d8d38e596d5442daaa58e9cfff74 to your computer and use it in GitHub Desktop.
Athena query in python
#!/usr/bin/env python3
"""
Author: Thomas Vincent
Date: June 12, 2024
Gist: https://gist.github.com/thomasvincent/0880d8d38e596d5442daaa58e9cfff74
This script executes Athena queries to create and manage a database and table,
then runs additional queries against the data.
"""
import boto3
# Constants (for clarity and maintainability)
S3_INPUT = 's3://cratebarrel/data'
S3_OUTPUT = 's3://athena-project/results/'
DATABASE = 'test_database'
TABLE = 'persons'
ATHENA_CLIENT = boto3.client('athena')
def run_query(query, database, s3_output, client=ATHENA_CLIENT):
"""Executes an Athena query and waits for it to complete.
Args:
query (str): The SQL query to execute.
database (str): The name of the database to use.
s3_output (str): The S3 location for query results.
client: The Boto3 Athena client (optional, for testing).
Returns:
str: The ID of the executed query.
"""
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': s3_output}
)
execution_id = response['QueryExecutionId']
print(f'Execution ID: {execution_id}')
# Wait for query completion using a waiter
waiter = client.get_waiter('query_execution_succeeded')
waiter.wait(QueryExecutionId=execution_id)
return execution_id
def create_database(database, client=ATHENA_CLIENT):
"""Creates a database in Athena if it doesn't exist."""
query = f"CREATE DATABASE IF NOT EXISTS {database};"
run_query(query, database, S3_OUTPUT, client)
def create_table(database, table, s3_input, client=ATHENA_CLIENT):
"""Creates an external table in Athena to access data in S3."""
query = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {database}.{table} (
`name` STRING,
`test1` STRING,
`test2` STRING,
`test3` STRING,
`test4` INT,
`test5` STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ('serialization.format' = '1')
LOCATION '{s3_input}'
TBLPROPERTIES ('has_encrypted_data'='false');
"""
run_query(query, database, S3_OUTPUT, client)
def main():
"""Main function to orchestrate the Athena operations."""
create_database(DATABASE)
create_table(DATABASE, TABLE, S3_INPUT)
# Query definitions (replace with your actual column names and conditions)
queries = [
f"SELECT * FROM {DATABASE}.{TABLE} WHERE test1 = 'F';", # Example filter
f"SELECT * FROM {DATABASE}.{TABLE} WHERE test4 > 30;" # Example filter
]
for query in queries:
run_query(query, DATABASE, S3_OUTPUT)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment