Amazon Athena Partition Projection synthetic data generator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# pip install boto3 | |
# ./partition-projection-synthetic-data.py --bucket <bucket-name> | |
# | |
# Creates the following files: | |
# - partition-projection-create-table.sql - CREATE TABLE statement | |
# - partition-projection-query-table.sql - SQL to query the table | |
import logging | |
import random | |
import json | |
import gzip | |
import io | |
from datetime import datetime, timedelta | |
from typing import Dict, Tuple, List, Union | |
import argparse | |
import boto3 | |
logging.basicConfig(level=logging.INFO) | |
s3_client = boto3.client('s3') | |
def generate_random_json(id: str, timestamp: datetime, speed: float, temperature: float, location: Dict[str, float]) -> Tuple[Dict[str, Union[str, float, Dict[str, float]]], float, float, Dict[str, float]]: | |
speed += random.randint(-5, 5) | |
temperature = round(temperature + random.uniform(-0.5, 0.5), 2) | |
location['lat'] += random.uniform(-0.0001, 0.0001) | |
location['lng'] += random.uniform(-0.0001, 0.0001) | |
return { | |
"id": id, | |
"timestamp": timestamp.isoformat(), | |
"speed": speed, | |
"temperature": temperature, | |
"location": location | |
}, speed, temperature, location | |
def upload_to_s3(filename: str, data: List[Dict[str, Union[str, float, Dict[str, float]]]], bucket_name: str) -> None: | |
with io.BytesIO() as output: | |
with gzip.GzipFile(fileobj=output, mode="w") as f: | |
for entry in data: | |
f.write(json.dumps(entry).encode()) | |
f.write(b'\n') | |
output.seek(0) | |
try: | |
s3_client.upload_fileobj(output, bucket_name, filename) | |
logging.info(f"Successfully uploaded {filename} to {bucket_name}") | |
except Exception as e: | |
logging.error(f"Failed to upload {filename} to {bucket_name}. Reason: {e}") | |
def write_partition_projection_query(bucket_name: str): | |
sql_content = f"""CREATE EXTERNAL TABLE IF NOT EXISTS partition_projection_sample_data ( | |
`id` string, | |
`timestamp` timestamp, | |
`speed` int, | |
`temperature` float, | |
`location` struct < lat: float, | |
lng: float > | |
) | |
PARTITIONED BY ( | |
device_id string, | |
year int, | |
month int, | |
day int, | |
hour int | |
) | |
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' | |
WITH SERDEPROPERTIES ( "timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSSSSSZZ" ) | |
LOCATION 's3://{bucket_name}/robocat/' | |
TBLPROPERTIES ( | |
"projection.enabled" = "true", | |
"projection.device_id.type" = "injected", | |
"projection.year.type" = "integer", | |
"projection.year.range" = "2023,2033", | |
"projection.month.type" = "integer", | |
"projection.month.range" = "1,12", | |
"projection.month.digits" = "2", | |
"projection.day.type" = "integer", | |
"projection.day.range" = "1,31", | |
"projection.day.digits" = "2", | |
"projection.hour.type" = "integer", | |
"projection.hour.range" = "0,23", | |
"projection.hour.digits" = "2", | |
"storage.location.template" = "s3://{bucket_name}/robocat/${{device_id}}/year=${{year}}/month=${{month}}/day=${{day}}/hour=${{hour}}" | |
); | |
""" | |
with open("partition-projection-create-table.sql", "w") as sql_file: | |
sql_file.write(sql_content) | |
def write_query(): | |
sql_content = f"""SELECT * FROM partition_projection_sample_data | |
WHERE device_id = '2' | |
AND year = 2023 | |
AND month = 7 | |
AND day = 1 | |
AND hour = 14 | |
""" | |
with open("partition-projection-query-table.sql", "w") as sql_file: | |
sql_file.write(sql_content) | |
def main(): | |
parser = argparse.ArgumentParser(description='AWS Athena - Partition Projection Synthetic Data Generator Tool') | |
parser.add_argument('--bucket', help='The S3 bucket name to store generated data.') | |
args = parser.parse_args() | |
bucket_name = args.bucket | |
start_date = datetime(2023, 7, 1) | |
end_date = datetime(2023, 7, 2) | |
current_date = start_date | |
id_cycle = ["1", "2"] | |
current_id_idx = 0 | |
initial_conditions = { | |
'speed': 50, | |
'temperature': 25, | |
'location': {'lat': -31.976056, 'lng': 115.9113084} | |
} | |
while current_date < end_date: | |
for hour in range(24): | |
for _ in range(6): # Six times an hour (every 10 minutes) | |
data = [generate_random_json(id_cycle[current_id_idx], current_date, **initial_conditions)[0] for _ in range(10)] | |
s3_path = f"robocat/{id_cycle[current_id_idx]}/year={current_date.year}/month={current_date.month:02d}/day={current_date.day:02d}/hour={hour:02d}/{current_date.strftime('%Y-%m-%d_%H-%M-%S')}_{random.randint(0,9999)}.jsonl.gz" | |
upload_to_s3(s3_path, data, bucket_name) | |
current_id_idx = (current_id_idx + 1) % len(id_cycle) | |
current_date += timedelta(minutes=10) | |
write_partition_projection_query(bucket_name) | |
write_query() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment