Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Last active August 16, 2023 23:21
Show Gist options
  • Save jeremyyeo/81833f260b9b02960a8fe91896a0f1d3 to your computer and use it in GitHub Desktop.
Save jeremyyeo/81833f260b9b02960a8fe91896a0f1d3 to your computer and use it in GitHub Desktop.
Dynamically generating `where` parameters to the `dbt_utils.union_relations` macro #dbt

Dynamically generating where parameters to the dbt_utils.union_relations macro

-!  🚨                                          WARNING                                          🚨  !-
There is basically no reason for you to do this - written just to show that it's possible.

Let's assume for a second, we have a few incremental models that we want to union with dbt_utils.union_relations().

-- models/foo_1.sql
{{ config(materialized = 'incremental') }}

select current_timestamp() as updated_at, 1 as id

-- models/foo_2.sql
{{ config(materialized = 'incremental') }}

select current_timestamp() as updated_at, 2 as id

For each of those models above, everytime we run, we're simply inserting a new row each time. Next, we want to union them:

-- models/foo.sql
{{ config(materialized = 'incremental') }}

with source_data as (

    select * from
    {{
        dbt_utils.union_relations(
            relations=[ref('foo_1'), ref('foo_2')]
        )
    }}

)

select * from source_data

Let's build twice in a row and see what we get:

dbt run --full-refresh
dbt run

image

image

Now, we can see that we have duplicated rows into foo - because we are not filtering on a subsequent run of the foo model... Now let's add the if is_incremental() check that all incremental models typically have into the foo model:

-- models/foo.sql
{{ config(materialized = 'incremental') }}

with source_data as (

    select * from
    {{
        dbt_utils.union_relations(
            relations=[ref('foo_1'), ref('foo_2')]
        )
    }}

)

select * from source_data
{% if is_incremental() %}
 where updated_at > (select max(updated_at) from {{ this }})
{% endif %}

And run it back:

dbt run --full-refresh
dbt run

image

image

Okay, now things are working as expected.

Now, for some reason or another - you may also want to use dbt_utils.union_relations where argument to do this subsequent filtering. Your first inclination might be to do:

-- models/foo.sql
{{ config(materialized = 'incremental') }}

with source_data as (

    select * from
    {{
        dbt_utils.union_relations(
            relations=[ref('foo_1'), ref('foo_2')],
            where='{% if is_incremental() %} updated_at > (select max(updated_at) from {{ this }}) {% endif %}'
        )
    }}

)

select * from source_data

Basically simply copy pasting the if is_incremental() logic as a string into the where param. Now, when we run:

dbt run --full-refresh

We are immediately hit with an error (we didn't even get to execute the second dbt run for that matter):

00:39:27  Running with dbt=1.4.4
00:39:29  Found 3 models, 0 tests, 0 snapshots, 0 analyses, 417 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
00:39:29  
00:39:34  Concurrency: 1 threads (target='dev')
00:39:34  
00:39:34  1 of 3 START sql incremental model dbt_jyeo.foo_1 .............................. [RUN]
00:39:40  1 of 3 OK created sql incremental model dbt_jyeo.foo_1 ......................... [SUCCESS 1 in 5.49s]
00:39:40  2 of 3 START sql incremental model dbt_jyeo.foo_2 .............................. [RUN]
00:39:45  2 of 3 OK created sql incremental model dbt_jyeo.foo_2 ......................... [SUCCESS 1 in 5.10s]
00:39:45  3 of 3 START sql incremental model dbt_jyeo.foo ................................ [RUN]
00:39:50  3 of 3 ERROR creating sql incremental model dbt_jyeo.foo ....................... [ERROR in 4.80s]
00:39:50  
00:39:50  Finished running 3 incremental models in 0 hours 0 minutes and 21.23 seconds (21.23s).
00:39:50  
00:39:50  Completed with 1 error and 0 warnings:
00:39:50  
00:39:50  Database Error in model foo (models/foo.sql)
00:39:50    001003 (42000): SQL compilation error:
00:39:50    syntax error line 20 at position 19 unexpected '%'.
00:39:50    syntax error line 20 at position 24 unexpected 'is_incremental'.
00:39:50    syntax error line 20 at position 81 unexpected 'from'.
00:39:50    compiled Code at target/run/my_dbt_project/models/foo.sql
00:39:50  
00:39:50  Done. PASS=2 WARN=0 ERROR=1 SKIP=0 TOTAL=3

Now why is that? Let's look at the debug logs to find the SQL that was generated out of the foo model:

create or replace transient table development_jyeo.dbt_jyeo.foo  as
        (

with source_data as (

    select * from
    
    

        (
            select
                cast('development_jyeo.dbt_jyeo.foo_1' as TEXT) as _dbt_source_relation,

                
                    cast("UPDATED_AT" as TIMESTAMP_LTZ) as "UPDATED_AT" ,
                    cast("ID" as NUMBER(1,0)) as "ID" 

            from development_jyeo.dbt_jyeo.foo_1

            where {% if is_incremental() %} updated_at > (select max(updated_at) from {{ this }}) {% endif %}
        )

        union all
        

        (
            select
                cast('development_jyeo.dbt_jyeo.foo_2' as TEXT) as _dbt_source_relation,

                
                    cast("UPDATED_AT" as TIMESTAMP_LTZ) as "UPDATED_AT" ,
                    cast("ID" as NUMBER(1,0)) as "ID" 

            from development_jyeo.dbt_jyeo.foo_2

            where {% if is_incremental() %} updated_at > (select max(updated_at) from {{ this }}) {% endif %}
        )

        

)

select * from source_data
        );

Notice how it's invalid SQL? See around line 20... we have invalid SQL here:

where {% if is_incremental() %} updated_at > (select max(updated_at) from {{ this }}) {% endif %}

Simply put, by passing in a string datatype into the where parameter of the dbt_utils.union_relations() macro... the macro has generated some SQL text that is invalid this is because parameters to macros ARE NOT parsed - they are simply taken at face value. They do not get converted from {{ this }} to development_jyeo.dbt_jyeo.foo which is what you may have expected here.

So what can we do? Let's FIRST try to generate a valid SQL text, assign then to a variable AND THEN pass that variable in to the where parameter. Let's do just that:

-- models/foo.sql
{{ config(materialized = 'incremental') }}

{% set my_filter -%}
    {% if is_incremental() %}
        updated_at > (select max(updated_at) from {{ this }})
    {% else %}
        1 = 1
    {% endif %}
{%- endset %}

with source_data as (

    select * from
    {{
        dbt_utils.union_relations(
            relations=[ref('foo_1'), ref('foo_2')],
            where=my_filter
        )
    }}

)

select * from source_data
$ dbt run --full-refresh
00:41:19  Running with dbt=1.4.4
00:41:20  Found 3 models, 0 tests, 0 snapshots, 0 analyses, 417 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
00:41:20  
00:41:26  Concurrency: 1 threads (target='dev')
00:41:26  
00:41:26  1 of 3 START sql incremental model dbt_jyeo.foo_1 .............................. [RUN]
00:41:31  1 of 3 OK created sql incremental model dbt_jyeo.foo_1 ......................... [SUCCESS 1 in 4.86s]
00:41:31  2 of 3 START sql incremental model dbt_jyeo.foo_2 .............................. [RUN]
00:41:36  2 of 3 OK created sql incremental model dbt_jyeo.foo_2 ......................... [SUCCESS 1 in 4.69s]
00:41:36  3 of 3 START sql incremental model dbt_jyeo.foo ................................ [RUN]
00:41:41  3 of 3 OK created sql incremental model dbt_jyeo.foo ........................... [SUCCESS 1 in 5.84s]
00:41:41  
00:41:41  Finished running 3 incremental models in 0 hours 0 minutes and 20.93 seconds (20.93s).
00:41:41  
00:41:41  Completed successfully
00:41:41  
00:41:41  Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3

Okay the first go round looks good... what did the generate SQL statement look like?

create or replace transient table development_jyeo.dbt_jyeo.foo  as
        (



with source_data as (

    select * from
    
    

        (
            select
                cast('development_jyeo.dbt_jyeo.foo_1' as TEXT) as _dbt_source_relation,

                
                    cast("UPDATED_AT" as TIMESTAMP_LTZ) as "UPDATED_AT" ,
                    cast("ID" as NUMBER(1,0)) as "ID" 

            from development_jyeo.dbt_jyeo.foo_1

            where 
        1 = 1
    
        )

        union all
        

        (
            select
                cast('development_jyeo.dbt_jyeo.foo_2' as TEXT) as _dbt_source_relation,

                
                    cast("UPDATED_AT" as TIMESTAMP_LTZ) as "UPDATED_AT" ,
                    cast("ID" as NUMBER(1,0)) as "ID" 

            from development_jyeo.dbt_jyeo.foo_2

            where 
        1 = 1
    
        )

        

)

select * from source_data
        );

Aha... okay 1=1 looks good. Now what about the subsequent run:

$ dbt run
00:43:48  Running with dbt=1.4.4
00:43:49  Found 3 models, 0 tests, 0 snapshots, 0 analyses, 417 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
00:43:49  
00:43:54  Concurrency: 1 threads (target='dev')
00:43:54  
00:43:55  1 of 3 START sql incremental model dbt_jyeo.foo_1 .............................. [RUN]
00:44:04  1 of 3 OK created sql incremental model dbt_jyeo.foo_1 ......................... [SUCCESS 1 in 9.20s]
00:44:04  2 of 3 START sql incremental model dbt_jyeo.foo_2 .............................. [RUN]
00:44:13  2 of 3 OK created sql incremental model dbt_jyeo.foo_2 ......................... [SUCCESS 1 in 8.93s]
00:44:13  3 of 3 START sql incremental model dbt_jyeo.foo ................................ [RUN]
00:44:22  3 of 3 OK created sql incremental model dbt_jyeo.foo ........................... [SUCCESS 2 in 9.39s]
00:44:22  
00:44:22  Finished running 3 incremental models in 0 hours 0 minutes and 33.20 seconds (33.20s).
00:44:22  
00:44:22  Completed successfully
00:44:22  
00:44:22  Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3

No issues... okay, what about the generated SQL statement?

create or replace  temporary view development_jyeo.dbt_jyeo.foo__dbt_tmp
  
   as (
    



with source_data as (

    select * from
    
    

        (
            select
                cast('development_jyeo.dbt_jyeo.foo_1' as TEXT) as _dbt_source_relation,

                
                    cast("UPDATED_AT" as TIMESTAMP_LTZ) as "UPDATED_AT" ,
                    cast("ID" as NUMBER(1,0)) as "ID" 

            from development_jyeo.dbt_jyeo.foo_1

            where 
        updated_at > (select max(updated_at) from development_jyeo.dbt_jyeo.foo)
    
        )

        union all
        

        (
            select
                cast('development_jyeo.dbt_jyeo.foo_2' as TEXT) as _dbt_source_relation,

                
                    cast("UPDATED_AT" as TIMESTAMP_LTZ) as "UPDATED_AT" ,
                    cast("ID" as NUMBER(1,0)) as "ID" 

            from development_jyeo.dbt_jyeo.foo_2

            where 
        updated_at > (select max(updated_at) from development_jyeo.dbt_jyeo.foo)
    
        )

        

)

select * from source_data
  );

Excellent - now we have the right where clause in place instead of some invalid SQL text. And finally just to validate our data:

image

image

Done... and you've also learned that there is almost no reason why you would want to do this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment