Skip to content

Instantly share code, notes, and snippets.

@khadersyed
Created February 21, 2023 20:25
Show Gist options
  • Save khadersyed/a1806b63168779f6246f6498b190a8d7 to your computer and use it in GitHub Desktop.
Save khadersyed/a1806b63168779f6246f6498b190a8d7 to your computer and use it in GitHub Desktop.
Simple S3/Glue/Athena Script
import boto3
import json
import logging
from botocore.exceptions import ClientError
def create_s3_bucket(s3_bucket_name):
# Create S3 client
iam = boto3.client("s3")
# Create bucket
try:
s3_client = boto3.client("s3")
response = s3_client.create_bucket(Bucket=s3_bucket_name)
except ClientError as e:
logging.error(e)
return False
return response["Location"]
def create_iam_policy(s3_bucket_name):
# Create IAM client
iam = boto3.client("iam")
# Create a policy
glue_s3_crawler_policy = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": [f"arn:aws:s3:::{s3_bucket_name}"]},
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject"],
"Resource": [f"arn:aws:s3:::{s3_bucket_name}/*"],
},
],
}
response = iam.create_policy(PolicyName="kh-glueS3CrawlerPolicy", PolicyDocument=json.dumps(glue_s3_crawler_policy))
return response["Policy"]["Arn"]
def create_iam_role(role_name):
iam = boto3.client("iam")
assume_role_policy_document = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Principal": {"Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole"}
],
}
)
response = iam.create_role(RoleName=role_name, AssumeRolePolicyDocument=assume_role_policy_document)
return response["Role"]["RoleName"]
def attach_iam_policy(policy_arn, role_name):
iam = boto3.client("iam")
response = iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
print(response)
def create_glue_crawler(crawler_name, iam_role_name, db_name, s3_path, s3_path_exclusions):
glue_client = boto3.client("glue")
response = glue_client.create_crawler(
Name=crawler_name,
Role=iam_role_name,
DatabaseName=db_name,
Targets={"S3Targets": [{"Path": s3_path, "Exclusions": s3_path_exclusions}]},
)
print(response)
def start_crawler(crawler_name):
glue_client = boto3.client("glue")
response = glue_client.start_crawler(Name=crawler_name)
print(response)
def get_num_rows(database_name, table_name, output_location):
athena_client = boto3.client("athena")
print(output_location)
query = f"SELECT COUNT(*) from {database_name}.{table_name}"
response = athena_client.start_query_execution(
QueryString=query, ResultConfiguration={"OutputLocation": output_location}
)
return response["QueryExecutionId"]
def get_query_results(execution_id):
athena_client = boto3.client("athena")
response = athena_client.get_query_results(QueryExecutionId=execution_id)
results = response["ResultSet"]["Rows"]
return results
def main():
s3_bucket_name = "khader-s3-glue-athena"
role_name = "kh-glueS3CrawlerRole"
# 0. Create S3 Bucket
# print("Creating S3 Bucket..", end="")
# s3_bucket = create_s3_bucket(s3_bucket_name=s3_bucket_name)
# print(s3_bucket)
# # 1. Create IAM Policy
# print("Creating IAM policy")
# policy_arn = create_iam_policy(s3_bucket_name=s3_bucket_name)
# # 2. Create IAM Role
# print("Creating IAM role")
# role_name = create_iam_role(role_name=role_name)
# # 3. Attach IAM policy
# print("Attaching IAM policy")
# attach_iam_policy(policy_arn=policy_arn, role_name=role_name)
# # 4. Attach AWS Managed Role
# service_role_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
# attach_iam_policy(policy_arn=service_role_arn, role_name=role_name)
crawler_name = "kh-GlueCrawler"
glue_database_name = "khGlueDB"
# # 5. Create Glue Crawler for path
# print("Creating Glue crawler")
# create_glue_crawler(
# crawler_name=crawler_name,
# iam_role_name=role_name,
# db_name=glue_database_name,
# s3_path=f"s3://{s3_bucket_name}/input",
# s3_path_exclusions=[],
# )
# # 6. Start crawler
# print("Starting Glue crawler")
# start_crawler(crawler_name=crawler_name)
# # 7. Make athena query
# database_name = glue_database_name
# table_name = "input"
# output_location = f"s3://{s3_bucket_name}/queries/"
# print("Querying athena:")
# execution_id = get_num_rows(database_name=database_name, table_name=table_name, output_location=output_location)
# print(execution_id)
# 8. Retrieve results
print("retrieving results")
print(get_query_results(execution_id=execution_id))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment