Skip to content

Instantly share code, notes, and snippets.

@fivetran-ezralee
Last active January 13, 2025 19:35
Show Gist options
  • Save fivetran-ezralee/f48ff0ae10737bbe7ebc2c97917c9601 to your computer and use it in GitHub Desktop.
Save fivetran-ezralee/f48ff0ae10737bbe7ebc2c97917c9601 to your computer and use it in GitHub Desktop.
# Now iterate through the records of existing gsheet of models to table_diff, run table_diff, and then populate new fields in the gsheet to indicate table_diff stats
from sqlglot import exp
for idx, record in enumerate(records, start=2): # skip row 1 which is just headers
try:
# Get the value from Column K (11th column - aka `tables_totally_match` field) in the current row. Non-null value indicates we have already run table_diff for this model/record.
cell_value = sheet.cell(idx, 11).value
# If Column K already has value, skip to next model/record :-)
if cell_value:
print(f"Row {idx}: Column K already has a value ('{cell_value}'), skipping this row.")
continue
# But first, parse all necessary data from gsheet `record``
schema=record["schema_name"]
table_name=record["model_name"]
if SOURCE_SCHEMA_OVERRIDE:
source_table = f'{PROJECT}.{SOURCE_SCHEMA_OVERRIDE}.{table_name}'
else:
source_table = f'{PROJECT}.zz_{schema}.{table_name}'
if TARGET_SCHEMA_OVERRIDE:
target_table = f'{PROJECT}.{TARGET_SCHEMA_OVERRIDE}.{table_name}'
else:
target_table = f'{PROJECT}.zz_sqlmesh_{schema}.{table_name}'
# do not assign grain if none is obvious. Note: In cases of no grain, accuracy and extent of table_diff are limited.
if not record["unique_key"] or record["unique_key"].lower() == "no grain" or record["primary_keys_confirmed"]==False:
grain = None
else:
grain = parse_grain(record["unique_key"])
print('grain: ', grain)
# conditionally parse the skip_columns: we want to remove STRUCT/complex data type fields from the analysis bc they break table-diff
skip_columns = ast.literal_eval(record["struct_fields"]) if record["struct_fields"] else None
print('skip columns: ', skip_columns)
# conditionally collate the where_statement
if record["date_field"] and START_DATE and END_DATE:
date_field = record["date_field"]
where_statement=f"""
DATE({date_field}) BETWEEN DATE('{START_DATE}') AND DATE('{END_DATE}')
"""
print('where statement: ', where_statement)
else:
where_statement = None
# check if either source/target table is empty--if so, skip it
is_source_table_empty = is_table_empty(bq_client, source_table, where_statement=where_statement)
is_target_table_empty = is_table_empty(bq_client, target_table, where_statement=where_statement)
if is_source_table_empty:
# if source table empty, log reason in column K, skip table_diff for model
sheet.update(f"K{idx}", [['SOURCE TABLE HAS NO ROWS']])
continue
if is_target_table_empty:
# now, if target table empty, log reason in column K, skip table_diff for model
sheet.update(f"K{idx}", [['TARGET TABLE HAS NO ROWS']])
continue
print('Neither source nor target table is empty. Proceeding with table_diff...')
if grain is None:
# sqlmesh table-diff can't handle diffs with no grains (even with the skip_grain_check flag...) :-(
continue
else:
# instantiate table_diff object based on above args
diff = context.table_diff(
source=source_table,
target=target_table,
on=exp.condition(grain),
where=where_statement,
show_sample=False,
skip_columns=skip_columns
)
print(f'Table diff complete for {table_name}')
# check if schemas match
schema_diff = diff.schema_diff()
if not schema_diff.added and not schema_diff.modified and not schema_diff.removed:
schemas_match = True
else:
schemas_match = False
# now do row checks
row_diff = diff.row_diff()
# row counts match
if row_diff.source_count == row_diff.target_count:
row_counts_match = True
else:
row_counts_match = False
if (
row_diff.full_match_pct >= 99.999 and
row_diff.join_count == row_diff.source_count == row_diff.target_count and
row_diff.partial_match_count == 0
):
tables_totally_match = True
else:
tables_totally_match = False
# Update the sheet
sheet.update(f"K{idx}", [[tables_totally_match]])
sheet.update(f"L{idx}", [[schemas_match]])
sheet.update(f"M{idx}", [[row_counts_match]])
sheet.update(f"N{idx}", [[row_diff.source_count]])
sheet.update(f"O{idx}", [[row_diff.target_count]])
sheet.update(f"P{idx}", [[row_diff.join_count]])
sheet.update(f"Q{idx}", [[row_diff.full_match_pct]])
sheet.update(f"R{idx}", [[row_diff.full_match_count]])
sheet.update(f"S{idx}", [[row_diff.partial_match_pct]])
sheet.update(f"T{idx}", [[row_diff.partial_match_count]])
sheet.update(f"U{idx}", [[row_diff.s_only_count]])
sheet.update(f"V{idx}", [[row_diff.t_only_count]])
except Exception as e:
print(f"Error querying table {table_name}: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment