-
-
Save fprochazka/67003d42b08b280263a962818976be0c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python-slugify==6.1.2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import glob | |
import json | |
import re | |
import tempfile | |
from pathlib import Path | |
from typing import Dict | |
from zipfile import ZipFile | |
from slugify import slugify | |
cur_dir = Path('./billing-cur').absolute() | |
print("\nInspecting manifests") | |
data_files = [] | |
manifest_columns_union: Dict[str, Dict[str, str]] = {} | |
# First I find manifests for each billing period | |
for period_last_manifest_path in glob.glob('*/*/*/*Manifest.json', root_dir=cur_dir, recursive=True): | |
print(cur_dir / period_last_manifest_path) | |
with open(cur_dir / period_last_manifest_path, 'r', encoding='utf-8') as fh: | |
manifest_data = json.load(fh) | |
# I want to aggreagete columns from all versions, | |
# if they've added something new, it will simply be considered empty in the old reports | |
for column in manifest_data['columns']: | |
manifest_columns_union[column['name']] = column | |
# the manifest references the latest report version | |
for key in manifest_data['reportKeys']: | |
data_files.append(cur_dir / key) | |
print("\nGenerating table schema") | |
csv_to_sql_columns = {} | |
sql_table_columns = {} | |
for column in manifest_columns_union.values(): | |
column_type: str | |
if column['type'] in ('DateTime'): | |
column_type = 'timestamptz' | |
elif column['type'] in ('BigDecimal', 'OptionalBigDecimal'): | |
column_type = 'numeric(37,20)' | |
elif column['type'] in ('String', 'OptionalString', 'Interval'): | |
column_type = "text" | |
else: | |
raise ValueError(f"Unexpected column type '{column['type']}'") | |
csv_name = f"{column['category']}/{column['name']}" | |
sql_name = slugify(re.sub(r'(?<!^)(?=[A-Z])', '_', csv_name).lower(), separator='_') | |
csv_to_sql_columns[csv_name] = sql_name | |
if sql_name == 'identity_time_interval': | |
sql_table_columns['identity_time_from'] = 'timestamptz' | |
sql_table_columns['identity_time_to'] = 'timestamptz' | |
else: | |
sql_table_columns[sql_name] = column_type | |
table_schema = "CREATE TABLE aws_cur (\n {0}\n);\n".format( | |
",\n ".join([f"{k} {v}" for k, v in sql_table_columns.items()]) | |
) | |
Path('./out/schema.sql').write_text(table_schema, encoding='utf-8') | |
print("\nMerging data for import") | |
csv.register_dialect('csv', lineterminator='\n', delimiter=',', quotechar='"', doublequote=True, quoting=csv.QUOTE_MINIMAL) | |
with open(Path('./out/for-import.csv'), mode='wt', encoding='utf-8') as target_fh: | |
# I want to create a single big CSV with all the data for one-click manual import | |
csv_writer = csv.DictWriter(target_fh, fieldnames=sql_table_columns.keys(), dialect='csv') | |
csv_writer.writeheader() | |
for zip_path in data_files: | |
print(zip_path) | |
with tempfile.TemporaryDirectory() as temp_dir: | |
with ZipFile(zip_path, 'r') as zip_obj: | |
zip_obj.extractall(temp_dir) | |
for csv_path in glob.glob('**/*.csv', root_dir=temp_dir, recursive=True): | |
with open(Path(temp_dir) / csv_path, 'r', encoding='utf-8') as fh: | |
# The DictReader opens the file and reads the first line, and the result is then in fieldnames filed | |
csv_reader = csv.DictReader(fh, dialect='csv') | |
# The column names for each row is taken from fieldnames, so we'll change them to the target column names | |
csv_reader.fieldnames = [csv_to_sql_columns[col] for col in list(csv_reader.fieldnames)] | |
for source_row in csv_reader: | |
# Splitting the interval column into two was simpler | |
(identity_time_from, identity_time_to) = source_row['identity_time_interval'].split('/') | |
source_row['identity_time_from'] = identity_time_from | |
source_row['identity_time_to'] = identity_time_to | |
del source_row['identity_time_interval'] | |
source_row = {k: (v if v != '' else None) for k, v in source_row.items()} | |
csv_writer.writerow(source_row) | |
target_fh.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment