Creates a sample Iceberg table in Athena allowing you to try out Iceberg easily. This script is geared towards people who are new to the AWS variety of Iceberg and keen to try some of the unique features of Iceberg.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# pip install boto3 | |
# ./lets-try-iceberg.py --bucket <bucket-name> | |
# | |
# Creates the following files: | |
# - 1-athena-iceberg-create-table.sql - CREATE TABLE statement | |
# - 2-athena-create-temp-table.sql - CREATE EXTERNAL TABLE statement | |
# - 3-insert-into-iceberg-from-temp-table.sql - INSERT INTO statement | |
# - 4-cleanup-temp-table.sql - DROP TABLE statement | |
# - 5-cleanup-iceberg-table.sql - DROP TABLE statement | |
import argparse | |
import logging | |
import random | |
from datetime import datetime | |
from typing import Dict, Tuple, Union | |
import gzip | |
import json | |
import boto3 | |
logging.basicConfig(level=logging.INFO) | |
aws_region: str = "us-west-2" | |
s3_client = boto3.client("s3", region_name=aws_region) | |
def generate_random_json( | |
id: str, | |
timestamp: datetime, | |
speed: float, | |
temperature: float, | |
location: Dict[str, float], | |
) -> Tuple[ | |
Dict[str, Union[str, float, Dict[str, float]]], float, float, Dict[str, float] | |
]: | |
speed += random.randint(-5, 5) | |
temperature = round(temperature + random.uniform(-0.5, 0.5), 2) | |
location["lat"] += random.uniform(-0.0001, 0.0001) | |
location["lng"] += random.uniform(-0.0001, 0.0001) | |
return ( | |
{ | |
"id": id, | |
"timestamp": timestamp.isoformat(), | |
"speed": speed, | |
"temperature": temperature, | |
"location": location, | |
}, | |
speed, | |
temperature, | |
location, | |
) | |
def generate_and_upload_jsonl(bucket_name: str, sample_count: int = 1000000): | |
gzip_filename = "samples.jsonl.gz" | |
s3_path = f"{bucket_name}/sample-data/{gzip_filename}" | |
with gzip.open(gzip_filename, "wt", encoding="UTF-8") as f: | |
for i in range(sample_count): | |
sample_data, _, _, _ = generate_random_json( | |
id=str(i % 5 + 1), # Cycling through IDs 1-5 | |
timestamp=datetime.now(), | |
speed=random.randint(0, 100), | |
temperature=random.uniform(-20, 40), | |
location={ | |
"lat": random.uniform(-90, 90), | |
"lng": random.uniform(-180, 180), | |
}, | |
) | |
f.write(json.dumps(sample_data) + "\n") | |
with open(gzip_filename, "rb") as f: | |
s3_client.upload_fileobj(f, bucket_name, f"sample-data/{gzip_filename}") | |
logging.info(f"Uploaded {gzip_filename} to s3://{s3_path}") | |
def create_bucket(bucket_name: str): | |
try: | |
s3_client.head_bucket(Bucket=bucket_name) | |
except Exception as e: | |
logging.info(f"Bucket {bucket_name} does not exist, creating it...") | |
s3_client.create_bucket( | |
Bucket=bucket_name, | |
CreateBucketConfiguration={"LocationConstraint": aws_region}, | |
) | |
else: | |
logging.info(f"Bucket {bucket_name} already exists, using it...") | |
def create_iceberg_query(bucket_name: str): | |
sql_content = f"""CREATE TABLE IF NOT EXISTS lets_try_iceberg_compaction ( | |
`id` string, | |
`timestamp` timestamp, | |
`speed` int, | |
`temperature` float, | |
`location` struct < lat: float, lng: float > | |
) | |
PARTITIONED BY ( | |
id | |
) | |
LOCATION 's3://{bucket_name}/' | |
TBLPROPERTIES ( | |
'table_type'='ICEBERG', | |
'format'='parquet', | |
'write_compression'='zstd' | |
); | |
""" | |
with open("1-athena-iceberg-create-table.sql", "w") as sql_file: | |
sql_file.write(sql_content) | |
def create_athena_temp_table_sql(bucket_name: str): | |
sql_content = f"""CREATE EXTERNAL TABLE IF NOT EXISTS lets_try_iceberg_compaction_sample_data ( | |
`id` string, | |
`timestamp` timestamp, | |
`speed` int, | |
`temperature` float, | |
`location` struct<lat:float, lng:float> | |
) | |
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' | |
WITH SERDEPROPERTIES ( "timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSSSSSZZ" ) | |
LOCATION 's3://{bucket_name}/sample-data/' | |
""" | |
with open("2-athena-create-temp-table.sql", "w") as sql_file: | |
sql_file.write(sql_content) | |
def create_insert_from_temp_to_iceberg_sql(bucket_name: str): | |
sql_content = f"""INSERT INTO lets_try_iceberg_compaction | |
SELECT * FROM lets_try_iceberg_compaction_sample_data | |
""" | |
with open("3-insert-into-iceberg-from-temp-table.sql", "w") as sql_file: | |
sql_file.write(sql_content) | |
def create_cleanup_table_sql(): | |
sql_content = f"""DROP TABLE IF EXISTS lets_try_iceberg_compaction_sample_data; | |
""" | |
with open("4-cleanup-temp-table.sql", "w") as sql_file: | |
sql_file.write(sql_content) | |
sql_content = f"""DROP TABLE IF EXISTS lets_try_iceberg_compaction; | |
""" | |
with open("5-cleanup-iceberg-table.sql", "w") as sql_file: | |
sql_file.write(sql_content) | |
def create_iam_role_and_policy_iceberg_compaction(bucket_name: str): | |
iam_client = boto3.client("iam") | |
sts_client = boto3.client("sts") | |
account_id = sts_client.get_caller_identity()["Account"] | |
role_name = "lets-try-iceberg-compaction-role" | |
policy_name = "lets-try-iceberg-compaction-policy" | |
existing_roles = iam_client.list_roles()["Roles"] | |
role_exists = any(role["RoleName"] == role_name for role in existing_roles) | |
if not role_exists: | |
assume_role_policy_document = json.dumps( | |
{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Principal": {"Service": "glue.amazonaws.com"}, | |
"Action": "sts:AssumeRole", | |
} | |
], | |
} | |
) | |
iam_client.create_role( | |
RoleName=role_name, AssumeRolePolicyDocument=assume_role_policy_document | |
) | |
logging.info(f"Created IAM role {role_name}") | |
else: | |
logging.info(f"IAM role {role_name} already exists") | |
existing_policies = iam_client.list_policies(Scope="Local")["Policies"] | |
policy_exists = any( | |
policy["PolicyName"] == policy_name for policy in existing_policies | |
) | |
if not policy_exists: | |
policy_document = { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"], | |
"Resource": [f"arn:aws:s3:::{bucket_name}/*"], | |
}, | |
{ | |
"Effect": "Allow", | |
"Action": ["s3:ListBucket"], | |
"Resource": [f"arn:aws:s3:::{bucket_name}"], | |
}, | |
{ | |
"Effect": "Allow", | |
"Action": ["glue:UpdateTable", "glue:GetTable"], | |
"Resource": [ | |
f"arn:aws:glue:{aws_region}:{account_id}:table/default/lets_try_iceberg_compaction", | |
f"arn:aws:glue:{aws_region}:{account_id}:database/default", | |
f"arn:aws:glue:{aws_region}:{account_id}:catalog", | |
], | |
}, | |
{ | |
"Effect": "Allow", | |
"Action": [ | |
"logs:CreateLogGroup", | |
"logs:CreateLogStream", | |
"logs:PutLogEvents", | |
], | |
"Resource": f"arn:aws:logs:{aws_region}:{account_id}:log-group:/aws-glue/iceberg-compaction/logs:*", | |
}, | |
], | |
} | |
policy = iam_client.create_policy( | |
PolicyName=policy_name, PolicyDocument=json.dumps(policy_document) | |
) | |
logging.info(f"Created IAM policy {policy_name}") | |
iam_client.attach_role_policy( | |
RoleName=role_name, PolicyArn=policy["Policy"]["Arn"] | |
) | |
else: | |
logging.info(f"IAM policy {policy_name} already exists") | |
def main(): | |
parser = argparse.ArgumentParser(description="Iceberg - Sample Table Creation") | |
parser.add_argument( | |
"--bucket", | |
type=str, | |
help="The S3 bucket name to store generated data. If not provided, a random bucket name will be generated.", | |
required=False, | |
) | |
args = parser.parse_args() | |
bucket_name = args.bucket | |
if not bucket_name: | |
bucket_name = f"iceberg-sample-data-{random.randint(100000, 999999)}" | |
create_bucket(bucket_name) | |
create_iceberg_query(bucket_name) | |
generate_and_upload_jsonl(bucket_name) | |
create_athena_temp_table_sql(bucket_name) | |
create_insert_from_temp_to_iceberg_sql(bucket_name) | |
create_iam_role_and_policy_iceberg_compaction(bucket_name) | |
create_cleanup_table_sql() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment