Last active
July 17, 2023 10:08
-
-
Save OatmealLick/0035dbdbd466eeb0fe5b01d4325b5ba3 to your computer and use it in GitHub Desktop.
Retrieve data contracts from cloud storage and schema from bigquery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def validate_table(data_product: str, table_id: str, env: str) -> (bool, List[str]): | |
""" | |
data_product: str - Needed to know where to look for the table (which directory in contracts bucket) | |
table_id: str - BigQuery fully qualified table id | |
env: str - Choosing 'dev' or 'prd' contracts bucket | |
""" | |
storage_client = storage.Client() | |
bigquery_client = bigquery.Client() | |
actual_schema = bigquery_client.get_table(table_id).schema | |
bucket = f"astrafy-data-contracts-{env}" | |
blobs = list(storage_client.list_blobs(bucket)) | |
if not blobs: | |
raise Exception(f"No blobs were found in bucket {bucket}") | |
latest_version = _get_latest_version(blobs) | |
table_name = table_id.split(".")[-1].strip() | |
path_part = f"{latest_version}/{data_product}/{table_name}" | |
blobs_filtered = [b for b in blobs if path_part in b.id] | |
if len(blobs_filtered) == 0: | |
# if no schema found it means that current model is not having a contract. | |
# we can change this behaviour if we enforce all models to have a contract. | |
logging.warning(f"No blobs of path {path_part} found in {blobs_filtered}") | |
return True, None | |
elif len(blobs_filtered) > 1: | |
raise Exception( | |
f"More than one blob of path {path_part} found in {blobs_filtered}") | |
return _validate_schema_for_blob(blobs_filtered[0], actual_schema) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment