-
-
Save fivetran-ezralee/f48ff0ae10737bbe7ebc2c97917c9601 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Now iterate through the records of existing gsheet of models to table_diff, run table_diff, and then populate new fields in the gsheet to indicate table_diff stats | |
from sqlglot import exp | |
for idx, record in enumerate(records, start=2): # skip row 1 which is just headers | |
try: | |
# Get the value from Column K (11th column - aka `tables_totally_match` field) in the current row. Non-null value indicates we have already run table_diff for this model/record. | |
cell_value = sheet.cell(idx, 11).value | |
# If Column K already has value, skip to next model/record :-) | |
if cell_value: | |
print(f"Row {idx}: Column K already has a value ('{cell_value}'), skipping this row.") | |
continue | |
# But first, parse all necessary data from gsheet `record`` | |
schema=record["schema_name"] | |
table_name=record["model_name"] | |
if SOURCE_SCHEMA_OVERRIDE: | |
source_table = f'{PROJECT}.{SOURCE_SCHEMA_OVERRIDE}.{table_name}' | |
else: | |
source_table = f'{PROJECT}.zz_{schema}.{table_name}' | |
if TARGET_SCHEMA_OVERRIDE: | |
target_table = f'{PROJECT}.{TARGET_SCHEMA_OVERRIDE}.{table_name}' | |
else: | |
target_table = f'{PROJECT}.zz_sqlmesh_{schema}.{table_name}' | |
# do not assign grain if none is obvious. Note: In cases of no grain, accuracy and extent of table_diff are limited. | |
if not record["unique_key"] or record["unique_key"].lower() == "no grain" or record["primary_keys_confirmed"]==False: | |
grain = None | |
else: | |
grain = parse_grain(record["unique_key"]) | |
print('grain: ', grain) | |
# conditionally parse the skip_columns: we want to remove STRUCT/complex data type fields from the analysis bc they break table-diff | |
skip_columns = ast.literal_eval(record["struct_fields"]) if record["struct_fields"] else None | |
print('skip columns: ', skip_columns) | |
# conditionally collate the where_statement | |
if record["date_field"] and START_DATE and END_DATE: | |
date_field = record["date_field"] | |
where_statement=f""" | |
DATE({date_field}) BETWEEN DATE('{START_DATE}') AND DATE('{END_DATE}') | |
""" | |
print('where statement: ', where_statement) | |
else: | |
where_statement = None | |
# check if either source/target table is empty--if so, skip it | |
is_source_table_empty = is_table_empty(bq_client, source_table, where_statement=where_statement) | |
is_target_table_empty = is_table_empty(bq_client, target_table, where_statement=where_statement) | |
if is_source_table_empty: | |
# if source table empty, log reason in column K, skip table_diff for model | |
sheet.update(f"K{idx}", [['SOURCE TABLE HAS NO ROWS']]) | |
continue | |
if is_target_table_empty: | |
# now, if target table empty, log reason in column K, skip table_diff for model | |
sheet.update(f"K{idx}", [['TARGET TABLE HAS NO ROWS']]) | |
continue | |
print('Neither source nor target table is empty. Proceeding with table_diff...') | |
if grain is None: | |
# sqlmesh table-diff can't handle diffs with no grains (even with the skip_grain_check flag...) :-( | |
continue | |
else: | |
# instantiate table_diff object based on above args | |
diff = context.table_diff( | |
source=source_table, | |
target=target_table, | |
on=exp.condition(grain), | |
where=where_statement, | |
show_sample=False, | |
skip_columns=skip_columns | |
) | |
print(f'Table diff complete for {table_name}') | |
# check if schemas match | |
schema_diff = diff.schema_diff() | |
if not schema_diff.added and not schema_diff.modified and not schema_diff.removed: | |
schemas_match = True | |
else: | |
schemas_match = False | |
# now do row checks | |
row_diff = diff.row_diff() | |
# row counts match | |
if row_diff.source_count == row_diff.target_count: | |
row_counts_match = True | |
else: | |
row_counts_match = False | |
if ( | |
row_diff.full_match_pct >= 99.999 and | |
row_diff.join_count == row_diff.source_count == row_diff.target_count and | |
row_diff.partial_match_count == 0 | |
): | |
tables_totally_match = True | |
else: | |
tables_totally_match = False | |
# Update the sheet | |
sheet.update(f"K{idx}", [[tables_totally_match]]) | |
sheet.update(f"L{idx}", [[schemas_match]]) | |
sheet.update(f"M{idx}", [[row_counts_match]]) | |
sheet.update(f"N{idx}", [[row_diff.source_count]]) | |
sheet.update(f"O{idx}", [[row_diff.target_count]]) | |
sheet.update(f"P{idx}", [[row_diff.join_count]]) | |
sheet.update(f"Q{idx}", [[row_diff.full_match_pct]]) | |
sheet.update(f"R{idx}", [[row_diff.full_match_count]]) | |
sheet.update(f"S{idx}", [[row_diff.partial_match_pct]]) | |
sheet.update(f"T{idx}", [[row_diff.partial_match_count]]) | |
sheet.update(f"U{idx}", [[row_diff.s_only_count]]) | |
sheet.update(f"V{idx}", [[row_diff.t_only_count]]) | |
except Exception as e: | |
print(f"Error querying table {table_name}: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment