Skip to content

Instantly share code, notes, and snippets.

@t04glovern
Last active September 25, 2023 07:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save t04glovern/b751378c246bca8d8b1149b5a2450790 to your computer and use it in GitHub Desktop.
Save t04glovern/b751378c246bca8d8b1149b5a2450790 to your computer and use it in GitHub Desktop.
Amazon Athena Partition Projection synthetic data generator
#!/usr/bin/env python
#
# pip install boto3
# ./partition-projection-synthetic-data.py --bucket <bucket-name>
#
# Creates the following files:
# - partition-projection-create-table.sql - CREATE TABLE statement
# - partition-projection-query-table.sql - SQL to query the table
import logging
import random
import json
import gzip
import io
from datetime import datetime, timedelta
from typing import Dict, Tuple, List, Union
import argparse
import boto3
logging.basicConfig(level=logging.INFO)
s3_client = boto3.client('s3')
def generate_random_json(id: str, timestamp: datetime, speed: float, temperature: float, location: Dict[str, float]) -> Tuple[Dict[str, Union[str, float, Dict[str, float]]], float, float, Dict[str, float]]:
speed += random.randint(-5, 5)
temperature = round(temperature + random.uniform(-0.5, 0.5), 2)
location['lat'] += random.uniform(-0.0001, 0.0001)
location['lng'] += random.uniform(-0.0001, 0.0001)
return {
"id": id,
"timestamp": timestamp.isoformat(),
"speed": speed,
"temperature": temperature,
"location": location
}, speed, temperature, location
def upload_to_s3(filename: str, data: List[Dict[str, Union[str, float, Dict[str, float]]]], bucket_name: str) -> None:
with io.BytesIO() as output:
with gzip.GzipFile(fileobj=output, mode="w") as f:
for entry in data:
f.write(json.dumps(entry).encode())
f.write(b'\n')
output.seek(0)
try:
s3_client.upload_fileobj(output, bucket_name, filename)
logging.info(f"Successfully uploaded {filename} to {bucket_name}")
except Exception as e:
logging.error(f"Failed to upload {filename} to {bucket_name}. Reason: {e}")
def write_partition_projection_query(bucket_name: str):
sql_content = f"""CREATE EXTERNAL TABLE IF NOT EXISTS partition_projection_sample_data (
`id` string,
`timestamp` timestamp,
`speed` int,
`temperature` float,
`location` struct < lat: float,
lng: float >
)
PARTITIONED BY (
device_id string,
year int,
month int,
day int,
hour int
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
WITH SERDEPROPERTIES ( "timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSSSSSZZ" )
LOCATION 's3://{bucket_name}/robocat/'
TBLPROPERTIES (
"projection.enabled" = "true",
"projection.device_id.type" = "injected",
"projection.year.type" = "integer",
"projection.year.range" = "2023,2033",
"projection.month.type" = "integer",
"projection.month.range" = "1,12",
"projection.month.digits" = "2",
"projection.day.type" = "integer",
"projection.day.range" = "1,31",
"projection.day.digits" = "2",
"projection.hour.type" = "integer",
"projection.hour.range" = "0,23",
"projection.hour.digits" = "2",
"storage.location.template" = "s3://{bucket_name}/robocat/${{device_id}}/year=${{year}}/month=${{month}}/day=${{day}}/hour=${{hour}}"
);
"""
with open("partition-projection-create-table.sql", "w") as sql_file:
sql_file.write(sql_content)
def write_query():
sql_content = f"""SELECT * FROM partition_projection_sample_data
WHERE device_id = '2'
AND year = 2023
AND month = 7
AND day = 1
AND hour = 14
"""
with open("partition-projection-query-table.sql", "w") as sql_file:
sql_file.write(sql_content)
def main():
parser = argparse.ArgumentParser(description='AWS Athena - Partition Projection Synthetic Data Generator Tool')
parser.add_argument('--bucket', help='The S3 bucket name to store generated data.')
args = parser.parse_args()
bucket_name = args.bucket
start_date = datetime(2023, 7, 1)
end_date = datetime(2023, 7, 2)
current_date = start_date
id_cycle = ["1", "2"]
current_id_idx = 0
initial_conditions = {
'speed': 50,
'temperature': 25,
'location': {'lat': -31.976056, 'lng': 115.9113084}
}
while current_date < end_date:
for hour in range(24):
for _ in range(6): # Six times an hour (every 10 minutes)
data = [generate_random_json(id_cycle[current_id_idx], current_date, **initial_conditions)[0] for _ in range(10)]
s3_path = f"robocat/{id_cycle[current_id_idx]}/year={current_date.year}/month={current_date.month:02d}/day={current_date.day:02d}/hour={hour:02d}/{current_date.strftime('%Y-%m-%d_%H-%M-%S')}_{random.randint(0,9999)}.jsonl.gz"
upload_to_s3(s3_path, data, bucket_name)
current_id_idx = (current_id_idx + 1) % len(id_cycle)
current_date += timedelta(minutes=10)
write_partition_projection_query(bucket_name)
write_query()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment