Skip to content

Instantly share code, notes, and snippets.

@sungchun12
Last active November 4, 2022 21:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sungchun12/bfd231c4f7d1c17fe0e751305f0a820c to your computer and use it in GitHub Desktop.
Save sungchun12/bfd231c4f7d1c17fe0e751305f0a820c to your computer and use it in GitHub Desktop.
Add column types, constraints, and default values when creating tables in dbt: https://www.loom.com/share/14020499f5f646b6bc80c909716850fd
version: 2
models:
- name: constraints_example
config:
constraints_enabled: true
columns:
- name: id
column_type: integer
description: I want to describe this one, but I don't want to list all the columns
constraint: not null
- name: color
column_type: string
- name: date_day
column_type: date
default_value: CURRENT_DATE()
{{
config(
materialized = "table_with_constraints"
)
}}
select
1 as id,
'blue' as color,
cast('2019-01-01' as date) as date_day
-- Expected output
{# create or replace TRANSIENT TABLE ANALYTICS.DBT_SUNG.CONSTRAINTS_EXAMPLE (
ID NUMBER(38,0) NOT NULL,
COLOR VARCHAR(16777216),
DATE_DAY DATE DEFAULT CURRENT_DATE()
); #}
{% macro snowflake__create_table_with_constraints_as(temporary, relation, compiled_code, language='sql') -%}
{%- if language == 'sql' -%}
{%- set transient = config.get('transient', default=true) -%}
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}
{%- if cluster_by_keys is not none and cluster_by_keys is string -%}
{%- set cluster_by_keys = [cluster_by_keys] -%}
{%- endif -%}
{%- if cluster_by_keys is not none -%}
{%- set cluster_by_string = cluster_by_keys|join(", ")-%}
{% else %}
{%- set cluster_by_string = none -%}
{%- endif -%}
{%- set sql_header = config.get('sql_header', none) -%}
{# # get the list of columns in my schema file#}
{%- set user_provided_columns = model['columns'] -%}
{{ sql_header if sql_header is not none }}
create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
{%- endif %} table {{ relation }} {% if copy_grants and not temporary -%} copy grants {%- endif %}
{# loop through user_provided_columns to create DDL with data types and constraints #}
{% if config.get('constraints_enabled', False) %}
(
{% for i in user_provided_columns %}
{% set col = user_provided_columns[i] %}
{{ col['name'] }} {{ col['column_type'] }} {% if col['default_value'] -%} default {{ col['default_value'] or "" }} {%- endif %} {{ col['constraint'] or "" }} {{ "," if not loop.last }}
{% endfor %}
)
{% endif %}
as
(
{%- if cluster_by_string is not none -%}
select * from(
{{ compiled_code }}
) order by ({{ cluster_by_string }})
{%- else -%}
{{ compiled_code }}
{%- endif %}
);
{% if cluster_by_string is not none and not temporary -%}
alter table {{relation}} cluster by ({{cluster_by_string}});
{%- endif -%}
{% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%}
alter table {{relation}} resume recluster;
{%- endif -%}
{%- elif language == 'python' -%}
{{ py_write_table(compiled_code=compiled_code, target_relation=relation, temporary=temporary) }}
{%- else -%}
{% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language, it got %s" % language) %}
{%- endif -%}
{% endmacro %}
create or replace transient table ANALYTICS.dbt_sung.constraints_example
(
id NUMBER(38,0) not null ,
color VARCHAR(3) ,
date_day date default CURRENT_DATE()
)
as
(
select
1 as id,
'blue' as color,
cast('2019-01-01' as date) as date_day
-- Expected output
);
{% materialization table_with_constraints, adapter='snowflake', supported_languages=['sql', 'python']%}
{% set original_query_tag = set_query_tag() %}
{%- set identifier = model['alias'] -%}
{%- set language = model['language'] -%}
{% set grant_config = config.get('grants') %}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{{ run_hooks(pre_hooks) }}
{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
{% if old_relation is not none and not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}
{% call statement('main', language=language) -%}
{{ snowflake__create_table_with_constraints_as(False, target_relation, compiled_code, language) }}
{%- endcall %}
{{ run_hooks(post_hooks) }}
{% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}
{% do unset_query_tag(original_query_tag) %}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
{% macro py_write_table(compiled_code, target_relation, temporary=False) %}
{{ compiled_code }}
def materialize(session, df, target_relation):
# make sure pandas exists
import importlib.util
package_name = 'pandas'
if importlib.util.find_spec(package_name):
import pandas
if isinstance(df, pandas.core.frame.DataFrame):
# session.write_pandas does not have overwrite function
df = session.createDataFrame(df)
df.write.mode("overwrite").save_as_table("{{ target_relation }}", create_temp_table={{temporary}})
def main(session):
dbt = dbtObj(session.table)
df = model(dbt, session)
materialize(session, df, dbt.this)
return "OK"
{% endmacro %}
{%macro py_script_comment()%}
# To run this in snowsight, you need to select entry point to be main
# And you may have to modify the return type to text to get the result back
# def main(session):
# dbt = dbtObj(session.table)
# df = model(dbt, session)
# return df.collect()
# to run this in local notebook, you need to create a session following examples https://github.com/Snowflake-Labs/sfguide-getting-started-snowpark-python
# then you can do the following to run model
# dbt = dbtObj(session.table)
# df = model(dbt, session)
{%endmacro%}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment