Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Last active January 13, 2023 17:47
Show Gist options
  • Save jeremyyeo/1927816bfaebcf3be91f605e9d84d215 to your computer and use it in GitHub Desktop.
Save jeremyyeo/1927816bfaebcf3be91f605e9d84d215 to your computer and use it in GitHub Desktop.
Making a dbt materialization that ignores certain columns #dbt

Making a dbt materialization that ignores certain columns

-!  🚨                                          WARNING                                          🚨  !-
This is an advanced dbt feature and is not recommended for users who are new to dbt.

The default dbt materialization that does inserts into a target table by selecting from another source table is the incremental materialization. By default, the incremental materialization requires that all columns in the target table are accounted for in the model code - this means that it may be challenging if you want exclude certain columns from being inserted by dbt (perhaps your target table has a self incrementing integer primary key column and you want the database to increment this automatically instead of having dbt do it on your behalf).

We can try to solve this by creating our own custom materialization that essentially inserts rows for us without having to know of each column in the target table. These next steps assume that one is running on Snowflake.

(1) Add the m13n.sql macro attached to this gist to your macros/ folder.

(2) Create the source table in Snowflake that we are going to select from later on:

create or replace table development.dbt_jyeo.my_source (
  user_name varchar,
  user_status varchar
);

insert into development.dbt_jyeo.my_source (user_name, user_status) 
values ('alice', 'active'), 
       ('bob', 'inactive')
;

(3) Create the target table in Snowflake with the self incrementing user_id primary key:

create or replace table development.dbt_jyeo.my_model (
  user_id int not null identity(1, 1),
  user_name varchar,
  user_status varchar
);

You can try selecting from this table to confirm that there are 0 rows in it.

Note: the way the custom m13n materialization is written, it expects your target table to already exist (and will error if not) which is different to the built in dbt materializations that will create your target table for you if they do not exist.

(4) Add a dbt yaml file specifying our source:

# models/sources.yml
version: 2

sources:
  - name: dbt_jyeo
    tables:
      - name: my_source

(5) Add a dbt model that selects from the source using our custom m13n materialization:

-- models/my_model.sql
{{ config(materialized = 'm13n') }}

select user_name, user_status from {{ source('dbt_jyeo', 'my_source') }}

(6) Build the my_model model via dbt run - the logs should read something like the following:

01:17:49  Running with dbt=1.0.1
01:17:49  Found 1 model, 0 tests, 0 snapshots, 0 analyses, 180 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
01:17:49  
01:17:55  Concurrency: 4 threads (target='dev')
01:17:55  
01:17:55  1 of 1 START m13n model dbt_jyeo.my_model....................................... [RUN]
01:17:59  m13n model development.dbt_jyeo.my_model.....inserting rows
01:18:01  m13n model development.dbt_jyeo.my_model.....inserted rows
01:18:02  1 of 1 OK created m13n model dbt_jyeo.my_model.................................. [SUCCESS 1 in 6.75s]
01:18:02  
01:18:02  Finished running 1 m13n model in 12.23s.
01:18:02  
01:18:02  Completed successfully
01:18:02  
01:18:02  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

(7) Selecting from the table in Snowflake should reveal that the user_id columns has self populated:

+---------+-----------+-------------+
| user_id | user_name | user_status |
+=========+===========+=============+
| 1       | alice     | active      |
+---------+-----------+-------------+
| 2       | bob       | inactive    |
+---------+-----------+-------------+

As shown, being able to write your own custom materialization opens up entire new ways to update tables with dbt - though it is worth mentioning again that custom materializations are meant for advanced dbt users and not so much for those who are just getting started.

For more help on using dbt, be sure to join the community on Slack.

{% materialization m13n, default %}
{#
/* This is a table materialization with some custom behaviour - see link below for built in table materialization.
* https://github.com/dbt-labs/dbt-core/tree/main/core/dbt/include/global_project/macros/materializations/models/table
*/
#}
{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = model['name'] + '__dbt_tmp' -%}
{%- set backup_identifier = model['name'] + '__dbt_backup' -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='table') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier, schema=schema, database=database, type='table') -%}
{%- set preexisting_intermediate_relation = adapter.get_relation(identifier=tmp_identifier, schema=schema, database=database) -%}
{%- set backup_relation_type = 'table' if old_relation is none else old_relation.type -%}
{%- set backup_relation = api.Relation.create(identifier=backup_identifier, schema=schema, database=database, type=backup_relation_type) -%}
{%- set preexisting_backup_relation = adapter.get_relation(identifier=backup_identifier, schema=schema, database=database) -%}
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}
{% if old_relation is none %}
{%- set error_msg = target_relation | upper ~ ' does not exist - please create it in the target database/schema first.' -%}
{{ exceptions.raise_compiler_error(error_msg) }}
{% endif %}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
-- build the intermediate relation
{% call statement('main') -%}
{{ get_create_table_as_sql(False, intermediate_relation, sql) }}
{%- endcall %}
-- build the backup relation
{% call statement('backup_old_relation') -%}
create table {{ backup_relation }} as select * from {{ old_relation }};
{%- endcall %}
-- insert into the target relation selecting from the intermediate relation
{%- set col_list = [] -%}
{%- set columns_in_intermediate_relation = adapter.get_columns_in_relation(intermediate_relation) -%}
{%- for col in columns_in_intermediate_relation -%}
{%- do col_list.append(col.name) -%}
{%- endfor -%}
{%- if col_list | length > 0 -%}
{{ log('m13n model ' ~ target_relation ~ '.....inserting rows', info=true) }}
{%- call statement('insert_rows') -%}
insert into {{ target_relation }} ({{ col_list | join(", ") }}) select {{ col_list | join(", ") }} from {{ intermediate_relation }};
{%- endcall -%}
{{ log('m13n model ' ~ target_relation ~ '.....inserted rows', info=true) }}
{%- endif -%}
-- cleanup
{{ drop_relation_if_exists(intermediate_relation) }}
{% do create_indexes(target_relation) %}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{% do persist_docs(target_relation, model) %}
-- `COMMIT` happens here
{{ adapter.commit() }}
-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment