Skip to content

Instantly share code, notes, and snippets.

@jtalmi
Last active March 15, 2022 20:41
Show Gist options
  • Save jtalmi/b5d63848e69f9bbde0d6cca0ac902a72 to your computer and use it in GitHub Desktop.
Save jtalmi/b5d63848e69f9bbde0d6cca0ac902a72 to your computer and use it in GitHub Desktop.
dbt linter -- check for unique/not_null tests and description/columns
#!/usr/bin/env python3
"""
CI script to check:
1. Models have both a unique and not_null test.
2. Models have a description and columns (i.e. a schema.yml entry)
"""
import json
import logging
import os
import subprocess
import sys
from collections import Counter
from typing import List, Tuple
import re
import pandas as pd
from fire import Fire
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
MANDATORY_SCHEMA_TESTS = {'unique', 'not_null'}
RELATIVE_MANIFEST_PATH = "target/manifest.json"
DEPLOYMENT_TAGS = {'hourly', 'nightly', 'weekly', 'external'}
DOMAIN_TAGS = {
'finance', 'ops', 'growth', 'product', 'supply', 'catalogue', 'infra', 'risk', 'flights', 'netsuite', 'connectivity', 'goods'
}
def fetch_changed_objects(branch: str = 'master') -> Tuple[List, List]:
''' Returns changed dbt models relative to a git branch'''
list_of_changed_files = subprocess.check_output(["git", "diff", branch, "--name-only"]).split()
list_of_changed_files = [file.decode('utf-8') for file in list_of_changed_files]
list_of_changed_models = [os.path.splitext(model)[0] for model in list_of_changed_files if model.startswith('models/') and model.endswith('.sql')] # pylint: disable=line-too-long
list_of_remaining_changed_models = [os.path.basename(model) for model in list_of_changed_models if os.path.exists("{}/{}.sql".format(os.getcwd(), model))] # pylint: disable=line-too-long
list_of_changed_macros = [os.path.splitext(model)[0] for model in list_of_changed_files if model.startswith('macros/') and model.endswith('.sql')] # pylint: disable=line-too-long
list_of_remaining_changed_macros = [os.path.basename(macro) for macro in list_of_changed_macros if os.path.exists("{}/{}.sql".format(os.getcwd(), macro))] # pylint: disable=line-too-long
list_of_new_models = subprocess.check_output(["git", "diff", "master", "--name-only", "--diff-filter=A"]).split()
list_of_new_models = [file.decode('utf-8') for file in list_of_new_models]
list_of_new_models = [os.path.splitext(model)[0] for model in list_of_new_models if model.startswith('models/') and model.endswith('.sql')]
list_of_remaining_new_models = [os.path.basename(model) for model in list_of_new_models if os.path.exists("{}/{}.sql".format(os.getcwd(), model))] # pylint: disable=line-too-long
return list_of_remaining_changed_models, list_of_remaining_changed_macros, list_of_remaining_new_models
def compare_manifests(old_manifest='tmp/manifest.json',
new_manifest=RELATIVE_MANIFEST_PATH,
compare_cols=['materialized', 'tags', 'snowflake_warehouse', 'cluster_by']):
with open(old_manifest, "r") as _file:
old_manifest = {k: v for k, v in json.load(_file)["nodes"].items() if k.startswith('model')}
with open(new_manifest, "r") as _file:
new_manifest = {k: v for k, v in json.load(_file)["nodes"].items() if k.startswith('model')}
model_list = set(old_manifest.keys()) | set(new_manifest.keys())
comparisons = {}
headers = []
for col in compare_cols:
headers.append(col + '_old')
headers.append(col + '_new')
for model in model_list:
compare_dict = {}
models_same = True
if not old_manifest.get(model):
old_manifest[model] = {}
elif not new_manifest.get(model):
new_manifest[model] = {}
for col in compare_cols:
config_field = 'config'
if old_manifest[model].get(config_field, {}).get(col) != new_manifest[model].get(config_field, {}).get(col):
compare_dict[col + '_old'] = old_manifest[model].get(config_field, {}).get(col)
compare_dict[col + '_new'] = new_manifest[model].get(config_field, {}).get(col)
models_same = False
if not models_same:
comparisons[model] = compare_dict
df = pd.DataFrame(comparisons).T
df.index = df.index.map(lambda x: x.split('.')[2])
if 'snowflake_warehouse_old' in df.columns:
df[['snowflake_warehouse_old',
'snowflake_warehouse_new']] = df[['snowflake_warehouse_old',
'snowflake_warehouse_new']].apply(lambda x: x.str[:-10], axis=1)
for col in compare_cols:
if col + '_new' in df.columns or col + '_old' in df.columns:
logging.info('Printing diff for config field: %s', col)
temp_df = df[[col + '_old', col + '_new']].copy()
temp_df = temp_df[~((temp_df[col + '_old'].isnull()) & (temp_df[col + '_new'].isnull()))]
print('\n', temp_df.sort_values(col + '_new').to_markdown(), '\n')
def main(only_changed_models=True, compare_branch='origin/master'):
''' Checks each model for unique/not_null test, columns, description'''
with open(RELATIVE_MANIFEST_PATH, "r") as _file:
# Fetch model/test data from manifest
manifest = json.load(_file)["nodes"]
models_per_test = {k: v["depends_on"]["nodes"] for k, v in manifest.items() if v["resource_type"] == "test"}
tests_per_model = invert_dict(models_per_test)
all_models = {k: [] for k, v in manifest.items() if v["resource_type"] == "model"}
# contains all models in manifest as keys and the list of
# tests (possibly empty) currently applied to each model, as values
tests_per_all_models = {**all_models, **tests_per_model}
tests_per_all_models = {k: v for k, v in tests_per_all_models.items() if not k.startswith('source')}
if only_changed_models:
changed_models, changed_macros, new_models = fetch_changed_objects(compare_branch)
tests_per_all_models = {k: v for k, v in tests_per_all_models.items() if manifest[k]['name'] in changed_models}
# Perform test coverage and schema yml checks
models_without_tests = set()
models_without_schema_yml = set()
models_with_invalid_tags = set()
models_with_inline_configs = set()
models_without_column_descriptions = []
models_without_taxonomy = []
for model, tests in tests_per_all_models.items():
unique_tags = set(manifest[model]['tags'])
columns = manifest[model]['columns']
# TODO Remove this conditional
if 'skip-lint' in manifest[model]['tags']:
continue
exact_model = model.split('.')[-1]
if '{{config' in ''.join(manifest[model]['raw_sql'].split()):
models_with_inline_configs.add(exact_model)
for schema_test in MANDATORY_SCHEMA_TESTS:
if not any(schema_test in test for test in tests):
if 'non_unique' in manifest[model]['tags'] and schema_test == 'unique':
continue
logging.error(f'No {schema_test} test found for model: %s', exact_model)
models_without_tests.add(exact_model)
if not manifest[model]['description']:
models_without_schema_yml.add(exact_model)
if not columns:
models_without_schema_yml.add(exact_model)
else:
invalid_cols = [key for key in columns.keys() if not columns[key]['description'].strip()]
if invalid_cols:
payload = (exact_model, invalid_cols)
models_without_column_descriptions.append(payload)
if len(unique_tags) != len(manifest[model]['tags']):
counts = Counter(manifest[model]['tags'])
duplicates = [tag for tag, count in counts.items() if count > 1]
models_with_invalid_tags.add(
(exact_model, f'Duplicate tags found: {duplicates}. Current tags: {manifest[model]["tags"]}'))
if len(unique_tags.intersection(DEPLOYMENT_TAGS)) != 1:
models_with_invalid_tags.add(
(exact_model, f'Invalid deployment tag found. Current tags: {manifest[model]["tags"]}'))
if len(unique_tags.intersection(DOMAIN_TAGS)) != 1:
models_with_invalid_tags.add(
(exact_model, f'Invalid domain tag found. Current tags: {manifest[model]["tags"]}'))
if not (re.match(exact_model, '(stg|dim|fct|mart)_') or re.match(exact_model, '_base')) and exact_model in new_models: # pylint: disable=line-too-long
models_without_taxonomy.append(exact_model)
compare_manifests()
# Exit if test coverage or schema checking fails
check_fail = models_without_tests or models_without_schema_yml or models_with_invalid_tags or models_with_inline_configs \
or models_without_column_descriptions or models_without_taxonomy
if check_fail:
if models_without_tests:
logging.error("\n\u274C The following models did not have both unique and not_null tests: \n - %s",
"\n - ".join(models_without_tests))
if models_without_schema_yml != set():
logging.error(
'''\n\u274C The following models did not have valid schema.yml entries: \n - %s\n
You can auto-generate a schema.yml from your scratch using this command:
dbt run-operation generate_model_yml --args '{"model_name": "your_model"}'.
''', "\n - ".join(models_without_schema_yml))
if models_with_invalid_tags != set():
logging.error(
f"\n\u274C The following models had invalid tags, each must contain one domain tag from {DOMAIN_TAGS} "
f"and one deployment tag from : {DEPLOYMENT_TAGS}. Tags are now defined in the dbt_project.yml"
"\n - %s", # pylint: disable=line-too-long
"\n - ".join([": ".join(pair) for pair in models_with_invalid_tags]))
if models_with_inline_configs != set():
logging.error(
f"\n\u274C The following models have configs defined inline. Please add model configs "
f"to the dbt_project.yml"
"\n - %s", # pylint: disable=line-too-long
"\n - ".join(models_with_inline_configs))
if models_without_column_descriptions != []:
logging.error(
f"\n\u274C The following models have no column descriptions. You can use"
f" dbt run-operation generate_model_yml --args {{'model_name': 'your_model'}} to generate the schema docs"
f" Please add column descriptions for the following columns"
"\n - %s", # pylint: disable=line-too-long
"\n - ".join([f'{model}: {cols}' for model, cols in models_without_column_descriptions]))
if models_without_taxonomy != []:
logging.error(
f"\n\u274C Please denote this model as a stg, dim, fct, or marts model."
f" For more information on naming conventions, see here:"
f" https://snaptravelinc.atlassian.net/wiki/spaces/GROW/pages/113803469/Data+Modelling+Workflow#DataModellingWorkflow-LintingandCIchecks'"
"\n - %s", # pylint: disable=line-too-long
"\n - ".join([f'{model}' for model in models_without_taxonomy]))
sys.exit(1) # exit with failure
def invert_dict(dict_of_list):
"""Create a dict where the keys are values from a dict of lists."""
inverted = {}
for k, value in dict_of_list.items():
for v in value:
inverted.setdefault(v, []).append(k)
return inverted
if __name__ == "__main__":
Fire(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment