Skip to content

Instantly share code, notes, and snippets.

@jtalmi
Created January 20, 2020 23:04
Show Gist options
  • Save jtalmi/c6265c8a17120cfb150c97512cb68aa6 to your computer and use it in GitHub Desktop.
Save jtalmi/c6265c8a17120cfb150c97512cb68aa6 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
'''Script to autogenerate dbt commands for changed models against a chosen git branch,
with support for fully refreshing models with specific tags.
Usage:
$ python3 dbt_run_changed.py --target_branch master --target dev --commands [run, test] --full_refresh_tags [full_refresh]
Assume model1 and model2 are changed models and model2 is tagged with "full_refresh". The script will generate three dbt commands:
1. dbt run --target dev --model model2 --full-refresh
2. dbt run --target dev --model model1
3. dbt test --target dev --model model1 model2
If we include the --full_refresh tag in the function call, command #2 will be:
2. dbt run --target dev --model model1 --full-refresh
'''
import os
import sys
import json
import subprocess
import logging
from typing import List, Dict
from fire import Fire
logging.basicConfig(stream=sys.stdout, level='INFO')
def intersection_is_not_empty(a: List, b: List) -> bool:
return len(set(a) & set(b)) > 0
def fetch_changed_models(branch: str='master') -> List:
''' Returns changed dbt models relative to a git branch'''
list_of_changed_files = subprocess.check_output(["git", "diff", branch, "--name-only"]).split()
list_of_changed_files = [file.decode('utf-8') for file in list_of_changed_files]
list_of_changed_models = [os.path.splitext(model)[0] for model in list_of_changed_files if model.startswith('models/') and model.endswith('.sql')]
list_of_remaining_changed_models = [os.path.basename(model) for model in list_of_changed_models if os.path.exists("{}/{}.sql".format(os.getcwd(), model))]
return list_of_remaining_changed_models
def fetch_project_json(list_of_models: List=None) -> Dict:
'''Returns a dict of model configs'''
dbt_command = ["dbt", "ls", "--output", "json"]
if list_of_models:
dbt_command += (['--model'] + list_of_models)
else:
dbt_command += ['--resource-type', 'model']
model_dicts = subprocess.check_output(dbt_command).decode('utf-8').split('\n')[:-1]
model_dicts = [json.loads(model_dict) for model_dict in model_dicts]
project_dict = {}
for model in model_dicts:
project_dict[model['name']] = model
return project_dict
def generate_dbt_command(command: str, target: str, include_models: List=[], exclude_models: List=[], children: bool=False, full_refresh: bool=False) -> List:
'''Returns a dbt command based on paramters'''
dbt_command = ['dbt', command, '--target', target]
if include_models:
models_with_children = [f'{model}+' for model in include_models] if children else include_models
dbt_command += (['--model'] + models_with_children)
if exclude_models:
dbt_command += (['--exclude'] + exclude_models)
if full_refresh:
dbt_command += ['--full-refresh']
return dbt_command
def generate_dbt_commands(project_dict: Dict, command: bool, target: str, model_names: List, children: bool, full_refresh: bool, full_refresh_tags: List) -> List:
'''Generates three possible dbt commands:
- dbt run command for models with full refresh tags
- dbt run command for models without full refresh tags
- dbt test command'''
dbt_commands = []
remaining_models = model_names
if command == 'run' and full_refresh_tags:
tagged_models = [model for model in remaining_models if intersection_is_not_empty(full_refresh_tags, project_dict[model]['config']['tags'])]
if tagged_models:
dbt_commands.append(generate_dbt_command(command, target, tagged_models, children=children, full_refresh=True))
remaining_models = list(set(model_names) - set(tagged_models))
if command == 'run' and remaining_models:
dbt_commands.append(generate_dbt_command(command, target, remaining_models, children=children, full_refresh=full_refresh))
elif command == 'test':
dbt_commands.append(generate_dbt_command(command, target, remaining_models, children=children))
return dbt_commands
def generate_and_execute_dbt_commands(target_branch: str, target: str, commands: List=['run', 'test'], children: bool=False, full_refresh: bool=False, full_refresh_tags: List=[]) -> None:
'''Generates and executes dbt commands'''
list_of_changed_models = fetch_changed_models(target_branch)
if not list_of_changed_models:
return "No changed models"
project_dict = fetch_project_json()
logging.info('Changed models: %s', " ".join(list_of_changed_models))
logging.info('Full refresh models with these tags: %s', full_refresh_tags)
dbt_commands = []
for command in commands:
dbt_commands += generate_dbt_commands(project_dict, command, target, list_of_changed_models, children, full_refresh, full_refresh_tags)
for dbt_command in dbt_commands:
#TODO: change to debug
logging.info("Executing: %s", " ".join(dbt_command))
dbt_return_code = subprocess.call(dbt_command)
if dbt_return_code != 0:
raise Exception('DBT command {} failed'.format(dbt_command))
return [" ".join(command) for command in dbt_commands]
if __name__ == "__main__":
Fire(generate_and_execute_dbt_commands)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment