Skip to content

Instantly share code, notes, and snippets.

@fprochazka
Created October 29, 2022 14:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fprochazka/67003d42b08b280263a962818976be0c to your computer and use it in GitHub Desktop.
Save fprochazka/67003d42b08b280263a962818976be0c to your computer and use it in GitHub Desktop.
python-slugify==6.1.2
import csv
import glob
import json
import re
import tempfile
from pathlib import Path
from typing import Dict
from zipfile import ZipFile
from slugify import slugify
cur_dir = Path('./billing-cur').absolute()
print("\nInspecting manifests")
data_files = []
manifest_columns_union: Dict[str, Dict[str, str]] = {}
# First I find manifests for each billing period
for period_last_manifest_path in glob.glob('*/*/*/*Manifest.json', root_dir=cur_dir, recursive=True):
print(cur_dir / period_last_manifest_path)
with open(cur_dir / period_last_manifest_path, 'r', encoding='utf-8') as fh:
manifest_data = json.load(fh)
# I want to aggreagete columns from all versions,
# if they've added something new, it will simply be considered empty in the old reports
for column in manifest_data['columns']:
manifest_columns_union[column['name']] = column
# the manifest references the latest report version
for key in manifest_data['reportKeys']:
data_files.append(cur_dir / key)
print("\nGenerating table schema")
csv_to_sql_columns = {}
sql_table_columns = {}
for column in manifest_columns_union.values():
column_type: str
if column['type'] in ('DateTime'):
column_type = 'timestamptz'
elif column['type'] in ('BigDecimal', 'OptionalBigDecimal'):
column_type = 'numeric(37,20)'
elif column['type'] in ('String', 'OptionalString', 'Interval'):
column_type = "text"
else:
raise ValueError(f"Unexpected column type '{column['type']}'")
csv_name = f"{column['category']}/{column['name']}"
sql_name = slugify(re.sub(r'(?<!^)(?=[A-Z])', '_', csv_name).lower(), separator='_')
csv_to_sql_columns[csv_name] = sql_name
if sql_name == 'identity_time_interval':
sql_table_columns['identity_time_from'] = 'timestamptz'
sql_table_columns['identity_time_to'] = 'timestamptz'
else:
sql_table_columns[sql_name] = column_type
table_schema = "CREATE TABLE aws_cur (\n {0}\n);\n".format(
",\n ".join([f"{k} {v}" for k, v in sql_table_columns.items()])
)
Path('./out/schema.sql').write_text(table_schema, encoding='utf-8')
print("\nMerging data for import")
csv.register_dialect('csv', lineterminator='\n', delimiter=',', quotechar='"', doublequote=True, quoting=csv.QUOTE_MINIMAL)
with open(Path('./out/for-import.csv'), mode='wt', encoding='utf-8') as target_fh:
# I want to create a single big CSV with all the data for one-click manual import
csv_writer = csv.DictWriter(target_fh, fieldnames=sql_table_columns.keys(), dialect='csv')
csv_writer.writeheader()
for zip_path in data_files:
print(zip_path)
with tempfile.TemporaryDirectory() as temp_dir:
with ZipFile(zip_path, 'r') as zip_obj:
zip_obj.extractall(temp_dir)
for csv_path in glob.glob('**/*.csv', root_dir=temp_dir, recursive=True):
with open(Path(temp_dir) / csv_path, 'r', encoding='utf-8') as fh:
# The DictReader opens the file and reads the first line, and the result is then in fieldnames filed
csv_reader = csv.DictReader(fh, dialect='csv')
# The column names for each row is taken from fieldnames, so we'll change them to the target column names
csv_reader.fieldnames = [csv_to_sql_columns[col] for col in list(csv_reader.fieldnames)]
for source_row in csv_reader:
# Splitting the interval column into two was simpler
(identity_time_from, identity_time_to) = source_row['identity_time_interval'].split('/')
source_row['identity_time_from'] = identity_time_from
source_row['identity_time_to'] = identity_time_to
del source_row['identity_time_interval']
source_row = {k: (v if v != '' else None) for k, v in source_row.items()}
csv_writer.writerow(source_row)
target_fh.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment