Skip to content

Instantly share code, notes, and snippets.

@OatmealLick
Last active July 17, 2023 10:08
Show Gist options
  • Save OatmealLick/0035dbdbd466eeb0fe5b01d4325b5ba3 to your computer and use it in GitHub Desktop.
Save OatmealLick/0035dbdbd466eeb0fe5b01d4325b5ba3 to your computer and use it in GitHub Desktop.
Retrieve data contracts from cloud storage and schema from bigquery
def validate_table(data_product: str, table_id: str, env: str) -> (bool, List[str]):
"""
data_product: str - Needed to know where to look for the table (which directory in contracts bucket)
table_id: str - BigQuery fully qualified table id
env: str - Choosing 'dev' or 'prd' contracts bucket
"""
storage_client = storage.Client()
bigquery_client = bigquery.Client()
actual_schema = bigquery_client.get_table(table_id).schema
bucket = f"astrafy-data-contracts-{env}"
blobs = list(storage_client.list_blobs(bucket))
if not blobs:
raise Exception(f"No blobs were found in bucket {bucket}")
latest_version = _get_latest_version(blobs)
table_name = table_id.split(".")[-1].strip()
path_part = f"{latest_version}/{data_product}/{table_name}"
blobs_filtered = [b for b in blobs if path_part in b.id]
if len(blobs_filtered) == 0:
# if no schema found it means that current model is not having a contract.
# we can change this behaviour if we enforce all models to have a contract.
logging.warning(f"No blobs of path {path_part} found in {blobs_filtered}")
return True, None
elif len(blobs_filtered) > 1:
raise Exception(
f"More than one blob of path {path_part} found in {blobs_filtered}")
return _validate_schema_for_blob(blobs_filtered[0], actual_schema)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment