Skip to content

Instantly share code, notes, and snippets.

@t04glovern
Last active July 10, 2024 03:19
Show Gist options
  • Save t04glovern/04f6f2934353eb1d0fffd487e9b9b6a3 to your computer and use it in GitHub Desktop.
Save t04glovern/04f6f2934353eb1d0fffd487e9b9b6a3 to your computer and use it in GitHub Desktop.
Creates a sample Iceberg table in Athena allowing you to try out Iceberg easily. This script is geared towards people who are new to the AWS variety of Iceberg and keen to try some of the unique features of Iceberg.
#!/usr/bin/env python3
"""
This script generates sample data, uploads it to an S3 bucket, and creates Iceberg and Athena tables.
It also creates IAM roles and policies for compaction and statistics generation if specified.
Install:
python3 -m venv .venv
source .venv/bin/activate
pip3 install boto3
curl https://gist.githubusercontent.com/t04glovern/04f6f2934353eb1d0fffd487e9b9b6a3/raw \
> lets-try-iceberg.py \
&& chmod +x lets-try-iceberg.py
Usage:
./lets-try-iceberg.py --bucket <bucket-name> --table <table-name> --compaction --statistics
Output Files:
1-athena-iceberg-create-table.sql - CREATE TABLE statement
2-athena-create-temp-table.sql - CREATE EXTERNAL TABLE statement
3-insert-into-iceberg-from-temp-table.sql - INSERT INTO statement
4-cleanup-temp-table.sql - DROP TABLE statement
5-cleanup-iceberg-table.sql - DROP TABLE statement
"""
import argparse
import logging
import random
from datetime import datetime
from typing import Dict, Tuple, Union
import gzip
import json
import boto3
logging.basicConfig(level=logging.INFO)
aws_region: str = "us-west-2"
s3_client = boto3.client("s3", region_name=aws_region)
def generate_random_json(
id: str,
timestamp: datetime,
speed: float,
temperature: float,
location: Dict[str, float],
) -> Tuple[
Dict[str, Union[str, float, Dict[str, float]]], float, float, Dict[str, float]
]:
speed += random.randint(-5, 5)
temperature = round(temperature + random.uniform(-0.5, 0.5), 2)
location["lat"] += random.uniform(-0.0001, 0.0001)
location["lng"] += random.uniform(-0.0001, 0.0001)
return (
{
"id": id,
"timestamp": timestamp.isoformat(),
"speed": speed,
"temperature": temperature,
"location": location,
},
speed,
temperature,
location,
)
def generate_and_upload_jsonl(bucket_name: str, sample_count: int = 1000000):
gzip_filename = "samples.jsonl.gz"
s3_path = f"{bucket_name}/sample-data/{gzip_filename}"
with gzip.open(gzip_filename, "wt", encoding="UTF-8") as f:
for i in range(sample_count):
sample_data, _, _, _ = generate_random_json(
id=str(i % 5 + 1), # Cycling through IDs 1-5
timestamp=datetime.now(),
speed=random.randint(0, 100),
temperature=random.uniform(-20, 40),
location={
"lat": random.uniform(-90, 90),
"lng": random.uniform(-180, 180),
},
)
f.write(json.dumps(sample_data) + "\n")
with open(gzip_filename, "rb") as f:
s3_client.upload_fileobj(f, bucket_name, f"sample-data/{gzip_filename}")
logging.info(f"Uploaded {gzip_filename} to s3://{s3_path}")
def create_bucket(bucket_name: str):
try:
s3_client.head_bucket(Bucket=bucket_name)
except Exception as e:
logging.info(f"Bucket {bucket_name} does not exist, creating it...")
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": aws_region},
)
else:
logging.info(f"Bucket {bucket_name} already exists, using it...")
def create_iceberg_query(bucket_name: str, table_name: str):
sql_content = f"""CREATE TABLE IF NOT EXISTS {table_name} (
`id` string,
`timestamp` timestamp,
`speed` int,
`temperature` float,
`location` struct < lat: float, lng: float >
)
PARTITIONED BY (
id
)
LOCATION 's3://{bucket_name}/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_compression'='zstd'
);
"""
with open("1-athena-iceberg-create-table.sql", "w") as sql_file:
sql_file.write(sql_content)
def create_athena_temp_table_sql(bucket_name: str, table_name: str):
sql_content = f"""CREATE EXTERNAL TABLE IF NOT EXISTS {table_name}_sample_data (
`id` string,
`timestamp` timestamp,
`speed` int,
`temperature` float,
`location` struct<lat:float, lng:float>
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
WITH SERDEPROPERTIES ( "timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSSSSSZZ" )
LOCATION 's3://{bucket_name}/sample-data/'
"""
with open("2-athena-create-temp-table.sql", "w") as sql_file:
sql_file.write(sql_content)
def create_insert_from_temp_to_iceberg_sql(bucket_name: str, table_name: str):
sql_content = f"""INSERT INTO {table_name}
SELECT * FROM {table_name}_sample_data
"""
with open("3-insert-into-iceberg-from-temp-table.sql", "w") as sql_file:
sql_file.write(sql_content)
def create_cleanup_table_sql(table_name: str):
sql_content = f"""DROP TABLE IF EXISTS {table_name}_sample_data;
"""
with open("4-cleanup-temp-table.sql", "w") as sql_file:
sql_file.write(sql_content)
sql_content = f"""DROP TABLE IF EXISTS {table_name};
"""
with open("5-cleanup-iceberg-table.sql", "w") as sql_file:
sql_file.write(sql_content)
def create_iam_role_and_policy_iceberg_compaction(bucket_name: str, table_name: str):
iam_client = boto3.client("iam")
sts_client = boto3.client("sts")
account_id = sts_client.get_caller_identity()["Account"]
role_name = "lets-try-iceberg-compaction-role"
policy_name = "lets-try-iceberg-compaction-policy"
# Check for existing role
try:
iam_client.get_role(RoleName=role_name)
logging.info(f"IAM role {role_name} already exists")
except iam_client.exceptions.NoSuchEntityException:
# Create the role if it does not exist
assume_role_policy_document = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "glue.amazonaws.com"},
"Action": "sts:AssumeRole",
}]
})
iam_client.create_role(
RoleName=role_name, AssumeRolePolicyDocument=assume_role_policy_document
)
logging.info(f"Created IAM role {role_name}")
# Define the policy document
policy_document = {
"Version": "2012-10-17",
"Statement": [
# S3 permissions
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"],
"Resource": [f"arn:aws:s3:::{bucket_name}/*"],
},
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": [f"arn:aws:s3:::{bucket_name}"],
},
# Glue permissions
{
"Effect": "Allow",
"Action": ["glue:UpdateTable", "glue:GetTable"],
"Resource": [
f"arn:aws:glue:{aws_region}:{account_id}:table/default/{table_name}",
f"arn:aws:glue:{aws_region}:{account_id}:database/default",
f"arn:aws:glue:{aws_region}:{account_id}:catalog",
],
},
# Logs permissions
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
],
"Resource": f"arn:aws:logs:{aws_region}:{account_id}:log-group:/aws-glue/iceberg-compaction/logs:*",
},
],
}
# Check for existing policy
try:
policy = iam_client.get_policy(PolicyArn=f"arn:aws:iam::{account_id}:policy/{policy_name}")
logging.info(f"IAM policy {policy_name} already exists, updating it...")
# Create a new version of the policy
policy_version = iam_client.create_policy_version(
PolicyArn=policy["Policy"]["Arn"],
PolicyDocument=json.dumps(policy_document),
SetAsDefault=True
)
# Optionally, clean up non-default versions of the policy
policy_versions = iam_client.list_policy_versions(PolicyArn=policy["Policy"]["Arn"])
for version in policy_versions["Versions"]:
if not version["IsDefaultVersion"]:
iam_client.delete_policy_version(
PolicyArn=policy["Policy"]["Arn"],
VersionId=version["VersionId"]
)
except iam_client.exceptions.NoSuchEntityException:
# Create the policy if it does not exist
policy = iam_client.create_policy(
PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
)
logging.info(f"Created IAM policy {policy_name}")
# Attach the policy to the role
iam_client.attach_role_policy(
RoleName=role_name, PolicyArn=policy["Policy"]["Arn"]
)
def create_iam_role_and_policy_statistics(bucket_name: str, table_name: str):
iam_client = boto3.client("iam")
sts_client = boto3.client("sts")
account_id = sts_client.get_caller_identity()["Account"]
role_name = "lets-try-iceberg-statistics-role"
policy_name = "lets-try-iceberg-statistics-policy"
# Check for existing role
try:
iam_client.get_role(RoleName=role_name)
logging.info(f"IAM role {role_name} already exists")
except iam_client.exceptions.NoSuchEntityException:
# Create the role if it does not exist
assume_role_policy_document = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "glue.amazonaws.com"},
"Action": "sts:AssumeRole",
}]
})
iam_client.create_role(
RoleName=role_name, AssumeRolePolicyDocument=assume_role_policy_document
)
logging.info(f"Created IAM role {role_name}")
# Define the policy document
policy_document = {
"Version": "2012-10-17",
"Statement": [
# S3 permissions
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:GetObject", "s3:DeleteObject"],
"Resource": [f"arn:aws:s3:::{bucket_name}/*"],
},
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": [f"arn:aws:s3:::{bucket_name}"],
},
# Glue permissions
{
"Effect": "Allow",
"Action": ["glue:UpdateTable", "glue:GetTable"],
"Resource": [
f"arn:aws:glue:{aws_region}:{account_id}:table/default/{table_name}",
f"arn:aws:glue:{aws_region}:{account_id}:database/default",
f"arn:aws:glue:{aws_region}:{account_id}:catalog",
],
},
# Logs permissions
{
"Effect": "Allow",
"Action": ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"],
"Resource": [f"arn:aws:logs:{aws_region}:{account_id}:log-group:/aws-glue:*"]
},
# IAM permissions
{
"Effect": "Allow",
"Action": ["iam:PassRole"],
"Resource": [f"arn:aws:iam::{account_id}:role/{role_name}"]
}
],
}
# Check for existing policy
try:
policy = iam_client.get_policy(PolicyArn=f"arn:aws:iam::{account_id}:policy/{policy_name}")
logging.info(f"IAM policy {policy_name} already exists, updating it...")
# Create a new version of the policy
policy_version = iam_client.create_policy_version(
PolicyArn=policy["Policy"]["Arn"],
PolicyDocument=json.dumps(policy_document),
SetAsDefault=True
)
# Optionally, clean up non-default versions of the policy
policy_versions = iam_client.list_policy_versions(PolicyArn=policy["Policy"]["Arn"])
for version in policy_versions["Versions"]:
if not version["IsDefaultVersion"]:
iam_client.delete_policy_version(
PolicyArn=policy["Policy"]["Arn"],
VersionId=version["VersionId"]
)
except iam_client.exceptions.NoSuchEntityException:
# Create the policy if it does not exist
policy = iam_client.create_policy(
PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
)
logging.info(f"Created IAM policy {policy_name}")
# Attach the policy to the role
iam_client.attach_role_policy(
RoleName=role_name, PolicyArn=policy["Policy"]["Arn"]
)
# Attach the AWSGlueServiceRole managed policy
iam_client.attach_role_policy(
RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
)
def main():
parser = argparse.ArgumentParser(description="Iceberg - Sample Table Creation")
parser.add_argument(
"--bucket",
type=str,
help="The S3 bucket name to store generated data. If not provided, a random bucket name will be generated.",
required=False,
)
parser.add_argument(
"--compaction",
action="store_true",
help="If provided, creates the compaction IAM role and policy.",
)
parser.add_argument(
"--statistics",
action="store_true",
help="If provided, creates the statistics IAM role and policy.",
)
parser.add_argument(
"--table",
type=str,
help="The table name to use. If not provided, default to 'lets_try_iceberg'",
required=False,
default="lets_try_iceberg",
)
args = parser.parse_args()
bucket_name = args.bucket
if not bucket_name:
bucket_name = f"iceberg-sample-data-{random.randint(100000, 999999)}"
create_bucket(bucket_name)
table_name = args.table
create_iceberg_query(bucket_name, table_name)
generate_and_upload_jsonl(bucket_name)
create_athena_temp_table_sql(bucket_name, table_name)
create_insert_from_temp_to_iceberg_sql(bucket_name, table_name)
if args.compaction:
create_iam_role_and_policy_iceberg_compaction(bucket_name, table_name)
if args.statistics:
create_iam_role_and_policy_statistics(bucket_name, table_name)
create_cleanup_table_sql(table_name)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment