Last active
June 12, 2024 22:07
-
-
Save thomasvincent/0880d8d38e596d5442daaa58e9cfff74 to your computer and use it in GitHub Desktop.
Athena query in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Author: Thomas Vincent | |
Date: June 12, 2024 | |
Gist: https://gist.github.com/thomasvincent/0880d8d38e596d5442daaa58e9cfff74 | |
This script executes Athena queries to create and manage a database and table, | |
then runs additional queries against the data. | |
""" | |
import boto3 | |
# Constants (for clarity and maintainability) | |
S3_INPUT = 's3://cratebarrel/data' | |
S3_OUTPUT = 's3://athena-project/results/' | |
DATABASE = 'test_database' | |
TABLE = 'persons' | |
ATHENA_CLIENT = boto3.client('athena') | |
def run_query(query, database, s3_output, client=ATHENA_CLIENT): | |
"""Executes an Athena query and waits for it to complete. | |
Args: | |
query (str): The SQL query to execute. | |
database (str): The name of the database to use. | |
s3_output (str): The S3 location for query results. | |
client: The Boto3 Athena client (optional, for testing). | |
Returns: | |
str: The ID of the executed query. | |
""" | |
response = client.start_query_execution( | |
QueryString=query, | |
QueryExecutionContext={'Database': database}, | |
ResultConfiguration={'OutputLocation': s3_output} | |
) | |
execution_id = response['QueryExecutionId'] | |
print(f'Execution ID: {execution_id}') | |
# Wait for query completion using a waiter | |
waiter = client.get_waiter('query_execution_succeeded') | |
waiter.wait(QueryExecutionId=execution_id) | |
return execution_id | |
def create_database(database, client=ATHENA_CLIENT): | |
"""Creates a database in Athena if it doesn't exist.""" | |
query = f"CREATE DATABASE IF NOT EXISTS {database};" | |
run_query(query, database, S3_OUTPUT, client) | |
def create_table(database, table, s3_input, client=ATHENA_CLIENT): | |
"""Creates an external table in Athena to access data in S3.""" | |
query = f""" | |
CREATE EXTERNAL TABLE IF NOT EXISTS {database}.{table} ( | |
`name` STRING, | |
`test1` STRING, | |
`test2` STRING, | |
`test3` STRING, | |
`test4` INT, | |
`test5` STRING | |
) | |
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' | |
WITH SERDEPROPERTIES ('serialization.format' = '1') | |
LOCATION '{s3_input}' | |
TBLPROPERTIES ('has_encrypted_data'='false'); | |
""" | |
run_query(query, database, S3_OUTPUT, client) | |
def main(): | |
"""Main function to orchestrate the Athena operations.""" | |
create_database(DATABASE) | |
create_table(DATABASE, TABLE, S3_INPUT) | |
# Query definitions (replace with your actual column names and conditions) | |
queries = [ | |
f"SELECT * FROM {DATABASE}.{TABLE} WHERE test1 = 'F';", # Example filter | |
f"SELECT * FROM {DATABASE}.{TABLE} WHERE test4 > 30;" # Example filter | |
] | |
for query in queries: | |
run_query(query, DATABASE, S3_OUTPUT) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment