Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
profile_dir: '~/.dbt' # the location of dbt configurations
target: prod # the dbt profile to use for connection
source_schema: fivetran_app # the tables' source schema (where fivetran imports your data to)
materialized: view # how to materialize these views
target_schema: app # the schema to materialize the base models under (where users will access the data from)
table_prefix: app # a prefix to prepend to each base model/view
directory: models/base/ # the directory under which to save the base models
empty_as_null: true # whether or not to apply EMPTYASNULL logic to all text fields
incl_fivetran_deleted: false # whether or not to include records marked as "deleted" by fivetran
excl_fivetran_synced: true # whether or not to include the time fivetran synced each record
tables: # the tables to generate base models for
- users
- orders
#!/usr/bin/env python
import os
import argparse
import yaml
import copy
import psycopg2
from jinja2 import Template
TEMPLATE = """
{{ '{{' }}
config(
schema='{{ target_schema }}',
materialized='{{ materialized }}',
bind=False
)
{{ '}}' }}
SELECT
{{ select_statements }}
FROM
"{{ table['schema'] }}"."{{ table['table'] }}"
{% if not incl_fivetran_deleted %}
WHERE NOT "_fivetran_deleted"
{% endif %}
"""
class DbtRedshiftGen(object):
def __init__(self, **kwargs):
self.profiles = self._get_profile(kwargs.pop('profile_dir'))
self.target = kwargs.pop('target')
self.source_schema = kwargs.pop('source_schema')
self.tables = kwargs.pop('tables')
self.all_tables = kwargs.pop('all_tables', False)
self.table_prefix = kwargs.pop('table_prefix', '')
self.directory = kwargs.pop('directory', 'models/base')
self.incl_fivetran_deleted = kwargs.get('incl_fivetran_deleted')
self.excl_fivetran_synced = kwargs.get('excl_fivetran_synced')
self.empty_as_null = kwargs.get('empty_as_null')
self.template_args = kwargs
self.template = Template(TEMPLATE)
def _get_profile(self, profile_dir):
"""
Load dbt profile.
"""
return yaml.safe_load(open(os.path.join(os.path.expanduser(profile_dir), 'profiles.yml')))
@property
def credentials(self):
"""
Load dbt credentials
"""
return self.profiles['default']['outputs'][self.target]
def get_connection(self):
return psycopg2.connect(
dbname=self.credentials.get('dbname'),
user=self.credentials.get('user'),
host=self.credentials.get('host'),
password=self.credentials.get('pass'),
port=self.credentials.get('port'),
connect_timeout=10)
@property
def source_schema_sql(self):
# determine query filters
if self.all_tables:
table_filter = '1=1'
else:
table_filter = "table_name IN ('{0}')".format("','".join(self.tables))
if self.incl_fivetran_deleted:
fivetran_deleted_filter = "1=1"
else:
fivetran_deleted_filter = "column_name != '_fivetran_deleted'"
if not self.excl_fivetran_synced:
fivetran_synced_filter = "1=1"
else:
fivetran_synced_filter = "column_name != '_fivetran_synced'"
return \
"""SELECT
table_schema as "schema",
table_name as "table",
column_name as "name",
data_type as "type",
CASE
WHEN data_type IN ('character varying', 'text')
THEN TRUE
ELSE FALSE
END as "is_text"
FROM
information_schema.columns
WHERE table_schema = '{source_schema}'
AND {table_filter}
AND {fivetran_deleted_filter}
AND {fivetran_synced_filter}
ORDER BY table_name, ordinal_position ASC;
""".format(source_schema=self.source_schema,
table_filter=table_filter,
fivetran_synced_filter=fivetran_synced_filter,
fivetran_deleted_filter=fivetran_deleted_filter)
def get_tables(self):
"""
Run query and fetch tables.
"""
cursor = self.get_connection().cursor()
cursor.execute(self.source_schema_sql)
tables = {}
for schema, table, name, type, is_text in cursor.fetchall():
col = { 'name': name, 'type': type, 'is_text': is_text }
if table not in tables:
tables[table] = {
'table': table,
'schema': schema,
'columns':[col]
}
else:
tables[table]['columns'].append(col)
print(tables)
return tables
def gen_select_statements_for_table(self, table):
"""
Select columns in a table.
"""
statements = []
for col in table['columns']:
if col['is_text'] and self.empty_as_null:
statement = 'CASE WHEN "{name}" = \'\' THEN NULL ELSE "{name}" END AS "{name}"'.format(**col)
else:
statement = '"{name}"'.format(**col)
statements.append(statement)
if self.incl_fivetran_deleted:
statements.append("_fivetran_deleted")
if not self.excl_fivetran_synced:
statements.append("_fivetran_synced")
return ",\n\t".join(statements)
def gen_model_for_table(self, table):
kwargs = copy.copy(self.template_args)
kwargs.update({
'select_statements':self.gen_select_statements_for_table(table),
'table': table
})
return self.template.render(**kwargs)
def gen_filepath_for_model(self, table):
"""
Create a filepath to write this model to.
"""
name = "{table}.sql".format(**table)
if self.table_prefix:
name = "_".join([self.table_prefix, name])
return os.path.join(self.directory, name)
def write_model_for_table(self, table):
"""
Write a base model for a table
"""
fp = self.gen_filepath_for_model(table)
print("Generating model: {0}".format(fp))
with open(fp, 'w') as f:
f.write(self.gen_model_for_table(table))
def run(self):
"""
Generate the models.
"""
if not os.path.exists(self.directory):
os.mkdir(self.directory)
for table_name, table in self.get_tables().items():
self.write_model_for_table(table)
def cli():
"""
Command line interface.
"""
parser = argparse.ArgumentParser(prog="dbt-gen")
parser.add_argument("--yaml", default=None,
help="A yaml file of arguments to pass to this program.")
parser.add_argument("--profile-dir", default="~/.dbt",
help="The location of your dbt profile directory. Defaults to '~/.dbt'.")
parser.add_argument("--target", default="dev",
help="Which target to load for the given profile.")
parser.add_argument("--source-schema",
help="The schema to generate base models for.")
parser.add_argument("--materialize", nargs="+", default="ephemeral",
help="How to materialize the models: ephemeral|view|table")
parser.add_argument("--target-schema",
help="If materialize == view, The schema to materialize base models as views under.")
parser.add_argument("--tables", nargs="+",
help="A list of space-separated tables under --schema to create base models for.")
parser.add_argument("--all-tables", action="store_true", default=False,
help="Import all tables under --schema.")
parser.add_argument("--table-prefix", default=None,
help="A string to prefix all generated dbt model names with.")
parser.add_argument("--directory", default="models/base/",
help="The directory to store generated models under.")
parser.add_argument("--empty-as-null", action="store_true", default=False,
help="Whether or not to apply EMPTYASNULL logic to character fields.")
parser.add_argument("--incl-fivetran-deleted", action="store_true", default=False,
help="Whether or not to include records that have been marked deleted by fivetran.")
parser.add_argument("--excl-fivetran-synced", action="store_true", default=False,
help="Whether or not to include fivetran's synced column.")
args = parser.parse_args()
if args.yaml:
kwargs = yaml.safe_load(open(args.yaml))
else:
kwargs = vars(args)
DbtRedshiftGen(**kwargs).run()
if __name__ == '__main__':
cli()
@abelsonlive

This comment has been minimized.

Copy link
Owner Author

commented Feb 2, 2018

Usage:

chmod +x generate_dbt_base_models.py
./generate_dbt_base_models.py --yaml base_models.yml
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.