Skip to content

Instantly share code, notes, and snippets.

@austospumanto
Created July 23, 2019 01:57
Show Gist options
  • Save austospumanto/5f83321ca0bcf38025e38b0c57167603 to your computer and use it in GitHub Desktop.
Save austospumanto/5f83321ca0bcf38025e38b0c57167603 to your computer and use it in GitHub Desktop.
Python Google BigQuery Utility Functions
"""
pip install \
bigquery-schema-generator \
google-api-python-client \
pandas-gbq \
pandas
"""
import logging
import os
import re
import time
from collections import OrderedDict
from contextlib import closing
from copy import deepcopy
from dataclasses import dataclass, astuple, asdict
from functools import lru_cache, wraps
from io import StringIO
from itertools import chain
from pathlib import Path
from typing import List, Dict, Union, Tuple
import pandas as pd
from bigquery_schema_generator.generate_schema import SchemaGenerator
from google.api_core.exceptions import NotFound as TableNotFound
from google.cloud import bigquery
from google.oauth2.service_account import Credentials
from pandas_gbq import to_gbq
MAX_COLUMN_NAME_LENGTH = 100
BAD_LETTER_PATT = re.compile(r"[^_\w]")
EXPECTED_EXC = {
"FLOAT": "cannot safely cast non-equivalent float64 to int64",
"STRING": "could not convert string to float",
}
PROJECT_ID = os.environ["GCP_PROJECT_ID"]
DATASET_ID = os.environ["GCP_DATASET_ID"]
CREDENTIALS_FILEPATH = os.environ["GCP_CREDENTIALS_FILEPATH"]
def creds():
return Credentials.from_service_account_file(
str(relative_to_project_root(CREDENTIALS_FILEPATH))
)
def timeit():
def outer(func):
@wraps(func)
def inner(*args, **kwargs):
start_time = time.time()
res = func(*args, **kwargs)
interval = time.time() - start_time
print("Time for '%s': %0.3f seconds" % (func.__qualname__, interval))
return res
return inner
return outer
class ProjectRootNotFoundError(Exception):
pass
def find_project_root() -> Path:
try:
parent = Path(__file__).parent
except NameError:
parent = Path(os.getcwd())
while True:
if (parent / ".projroot").exists():
break
child, parent = parent, parent.parent
if "-" in child.name:
continue
if child is parent:
raise ProjectRootNotFoundError()
if parent.name.replace("-", "_") == child.name:
break
if not (parent / ".projroot").exists():
(parent / ".projroot").touch()
return parent
def relative_to_project_root(path: str) -> Path:
if os.environ.get("PROJECT_ROOT") is not None:
projroot = Path(os.environ["PROJECT_ROOT"]).absolute()
else:
projroot = find_project_root()
return projroot / path
def get_file_size_in_bytes(fp) -> int:
fp = relative_to_project_root(str(fp))
return fp.stat().st_size
@dataclass
class GbqField:
name: str
type: str
mode: str
def __post_init__(self):
self.name = fix_column_name_for_gbq(self.name)
@dataclass
class GbqSchema:
fields: List[GbqField]
def __post_init__(self):
self.fields = sorted(self.fields, key=lambda f: (f.name, f.type, f.mode))
def __getitem__(self, item):
return next(f for f in self.fields if f.name == item)
def as_series(self) -> pd.Series:
index, values = zip(*map(astuple, self.fields))
return pd.Series(values, index=index)
def to_canonical(self) -> List[Dict[str, str]]:
return [asdict(f) for f in self.fields]
@timeit()
def load_df(fp, allstrings, usecols=None):
if isinstance(fp, (str, Path)):
fp = relative_to_project_root(str(fp))
assert get_file_size_in_bytes(fp) > 0, get_file_size_in_bytes(fp)
fp = str(fp)
df_: pd.DataFrame = pd.read_csv(
fp,
dtype=str if allstrings else None,
keep_default_na=not allstrings,
usecols=usecols,
)
print(f"Loaded dataframe with df_.shape={df_.shape}")
return df_
def massage_filepath(fp: Union[str, Path]) -> Path:
fp = str(fp)
if ".csv" not in fp and "dbo." not in fp:
fp = f"dbo.{fp}.csv"
if "pdw/" not in fp:
# noinspection PyUnresolvedReferences
fp = f'pdw/{fp.lstrip("/")}'
ret: Path = relative_to_project_root(str(fp))
assert ret.exists(), ret
return ret
@timeit()
@lru_cache(1000)
def genschema2(fp) -> GbqSchema:
logging.basicConfig(level=logging.INFO)
sg = SchemaGenerator(
input_format="csv",
infer_mode=True,
keep_nulls=True,
quoted_values_are_strings=False, # TODO: ?
debugging_interval=100_000,
)
schema = run_schema_generator(sg, fp)
return GbqSchema(fields=[GbqField(**f) for f in schema])
@timeit()
@lru_cache(1)
def get_text(fp):
return fp.read_text()
@timeit()
def deduce_schema(sg: SchemaGenerator, fin) -> tuple:
return sg.deduce_schema(file=fin)
@timeit()
def run_schema_generator(sg: SchemaGenerator, fp: Path) -> dict:
# with closing(sample_lines_from(fp)) as fin:
with closing(StringIO(get_text(fp))) as fin:
schema_map, error_logs = deduce_schema(sg, fin)
for error in error_logs:
print("Problem on line %s: %s", error["line"], error["msg"])
starting_schema: OrderedDict = sg.flatten_schema(schema_map)
schema = deepcopy(starting_schema)
for idx, schema_item in enumerate(starting_schema):
mode = schema_item["mode"]
name = schema_item["name"]
type_ = schema_item["type"]
assert mode in ("REQUIRED", "NULLABLE")
assert type_ in (
"DATE",
"INTEGER",
"STRING",
"FLOAT",
"TIMESTAMP",
"BOOLEAN",
"TIME",
), type_
if type_ in ("STRING", "FLOAT"):
downcasted = maybe_downcast_type(fp, name, type_)
assert downcasted in ("STRING", "FLOAT", "INTEGER")
schema[idx]["type"] = downcasted
return schema
def maybe_downcast_type(fp, name, current_type):
if isinstance(fp, str):
fp = massage_filepath(fp)
df_ = load_df(StringIO(get_text(fp)), usecols=[name], allstrings=True)
assert current_type in ("STRING", "FLOAT"), current_type
downcasted_type = "STRING"
try:
float_df = df_[name].replace("", pd.np.nan).astype(float)
downcasted_type = "FLOAT"
# noinspection PyUnresolvedReferences
float_df.astype(pd.Int64Dtype())
downcasted_type = "INTEGER"
except ValueError as e:
assert current_type == "STRING", current_type
_expexc = EXPECTED_EXC["STRING"]
assert _expexc in repr(e), (name, current_type)
except TypeError as e:
_expexc = EXPECTED_EXC["FLOAT"]
assert _expexc in repr(e), (name, current_type)
if current_type == "FLOAT":
assert downcasted_type in ("FLOAT", "INTEGER")
return downcasted_type
def get_filepaths() -> List[Path]:
return sorted(relative_to_project_root("pdw").glob("*.csv"))
def get_tablename(fp: Path) -> str:
fp_name_parts = fp.name.split(".")
assert len(fp_name_parts) == 3, fp_name_parts
return fp_name_parts[1]
def test__maybe_downcast_type():
expecteds = [
("DecimalValue", "FLOAT"),
("ParentActionID", "INTEGER"),
("ItemTypeID", "INTEGER"),
("GroupID", "INTEGER"),
("TextValue", "STRING"),
("Y_LastModifiedUserID", "INTEGER"),
("StockDescription", "STRING"),
]
fp_ = "pdw/dbo.v_ItemActionsDetails.csv"
for (colname, expected) in expecteds:
act = maybe_downcast_type(fp_, colname, "STRING")
assert act == expected, (act, expected)
def fix_column_name_for_gbq(name: str) -> str:
return BAD_LETTER_PATT.sub("_", name)[:MAX_COLUMN_NAME_LENGTH]
@timeit()
def fix_column_names_for_gbq(df_: pd.DataFrame) -> pd.DataFrame:
return df_.rename(columns=fix_column_name_for_gbq)
def enable_verbose_logging():
logger = logging.getLogger("pandas_gbq")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
@timeit()
def send_to_bigquery(
df_: pd.DataFrame,
verbose: bool,
destination_dataset: str,
destination_table: str,
chunksize=None,
location="US",
table_schema=None,
):
df_ = fix_column_names_for_gbq(df_)
if verbose:
enable_verbose_logging()
if table_schema:
df_ = ensure_schema_types(df_, table_schema)
@timeit()
def _to_gbq():
print("Starting to_gbq...")
to_gbq(
df_,
destination_table=".".join((destination_dataset, destination_table)),
project_id=PROJECT_ID,
chunksize=chunksize,
if_exists="replace",
location=location,
progress_bar=True,
credentials=creds(),
table_schema=table_schema,
)
_to_gbq()
@timeit()
def ensure_schema_types(df_, schema_):
for item in schema_:
colname = item["name"]
coltype = item["type"]
if coltype in ("FLOAT", "INTEGER"):
df_[colname] = df_[colname].replace("", pd.np.nan).astype(float)
if coltype == "INTEGER":
# noinspection PyUnresolvedReferences
df_[colname] = df_[colname].astype(pd.Int64Dtype())
# Convert to string, with special case: pd.np.nan --> ''
df_[colname] = pd.np.where(
pd.isnull(df_[colname]), "", df_[colname].astype(str)
)
return df_
@timeit()
def load_into_bigquery(fp):
fp = massage_filepath(fp)
tablename = get_tablename(fp)
theschema = genschema2(fp)
print(theschema)
send_to_bigquery(
load_df(fp, allstrings=True),
verbose=True,
chunksize=50000,
destination_dataset="pdw",
destination_table=tablename,
table_schema=theschema.to_canonical(),
)
@timeit()
def gbq_fetch_table_schema(tablename):
client = bigquery.Client(credentials=creds())
table_id = f"{PROJECT_ID}.{DATASET_ID}.{tablename}"
table = client.get_table(table_id)
print(f"Got table '{table.project}.{table.dataset_id}.{table.table_id}'")
# View table properties
print(f"Table schema: {table.schema}")
print(f"Table description: {table.description}")
print(f"Table has {table.num_rows} rows")
return GbqSchema(
fields=[
GbqField(name=f.name, type=f.field_type, mode=f.mode) for f in table.schema
]
)
def compare_schemas(s1, s2) -> Dict[str, Tuple[GbqField, GbqField]]:
names = set(chain((f.name for f in s1.fields), (f.name for f in s2.fields)))
differences = {}
for name in names:
if s1[name] != s2[name]:
print(s1[name], s2[name], sep="\n", end="\n\n")
assert name not in differences
differences[name] = (s1[name], s2[name])
return differences
@timeit()
def genschema2_matches_existing_bq_schema(tablename) -> bool:
print(f"Running genschema2_matches_existing_bq_schema for tablename={tablename}")
try:
act_schema = gbq_fetch_table_schema(tablename)
except TableNotFound:
return False
exp_schema = genschema2(massage_filepath(tablename))
diffs = compare_schemas(act_schema, exp_schema)
if diffs:
return False
else:
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment