Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Last active March 20, 2024 15:33
Show Gist options
  • Save jeremyyeo/7da6a6a4fd6dba598c04c431f74e91c0 to your computer and use it in GitHub Desktop.
Save jeremyyeo/7da6a6a4fd6dba598c04c431f74e91c0 to your computer and use it in GitHub Desktop.
Customising dbt snapshots #dbt

Customising dbt snapshots

It is currently not possible to modify the actual names of the snapshot metafields (dbt_valid_from and friends) even if you go down this path to customize the built in macros below - dbt-labs/dbt-core#7018

Customising dbt snapshots so that dbt_valid_from dates can use a variable.

Macros that need to be overridden are in the materializations/snapshots folder in dbt-core.

  1. Setup initial snapshot:
-- snapshots/snappy.sql

{% snapshot snappy %}

{{
    config(
        target_database='development',
        target_schema='dbt_jyeo',
        unique_key='user_id',
        strategy='check',
        check_cols='all'
    )
}}

SELECT *
  FROM (VALUES (1, 'inactive', 123)
       ) 
    AS my_table(user_id, status, price)

{% endsnapshot %}
dbt snapshot --vars 'my_date: 1970-01-01'
USER_ID STATUS PRICE DBT_SCD_ID DBT_UPDATED_AT DBT_VALID_FROM DBT_VALID_TO
1 inactive 123 6fd297c5336d066f64983aca3899571a 2021-11-22 01:02:20.909 1970-01-01
  1. Modify snapshot to simulate source data change and snapshot with new variable value:
-- snapshots/snappy.sql

{% snapshot snappy %}

{{
    config(
        target_database='development',
        target_schema='dbt_jyeo',
        unique_key='user_id',
        strategy='check',
        check_cols='all'
    )
}}

SELECT *
  FROM (VALUES (1, 'active', 123)
       ) 
    AS my_table(user_id, status, price)

{% endsnapshot %}
dbt snapshot --vars 'my_date: 1970-02-01'
USER_ID STATUS PRICE DBT_SCD_ID DBT_UPDATED_AT DBT_VALID_FROM DBT_VALID_TO
1 active 123 64297829e90911a07a647c9b8a0f193c 2021-11-22 01:04:35.111 1970-02-01
1 inactive 123 6fd297c5336d066f64983aca3899571a 2021-11-22 01:02:20.909 1970-01-01 1970-02-01 00:00:00.000
{% macro build_snapshot_table(strategy, sql) %}
select *,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.updated_at }} as dbt_updated_at,
-- {{ strategy.updated_at }} as dbt_valid_from,
'{{ var("my_date") }}'::date as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
from (
{{ sql }}
) sbq
{% endmacro %}
{% macro snapshot_staging_table(strategy, source_sql, target_relation) %}
with snapshot_query as (
{{ source_sql }}
),
snapshotted_data as (
select *,
{{ strategy.unique_key }} as dbt_unique_key
from {{ target_relation }}
where dbt_valid_to is null
),
insertions_source_data as (
select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
-- {{ strategy.updated_at }} as dbt_valid_from,
'{{ var("my_date") }}'::date as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id
from snapshot_query
),
updates_source_data as (
select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
-- {{ strategy.updated_at }} as dbt_valid_from,
'{{ var("my_date") }}'::date as dbt_valid_from,
-- {{ strategy.updated_at }} as dbt_valid_to
'{{ var("my_date") }}'::date as dbt_valid_to
from snapshot_query
),
{%- if strategy.invalidate_hard_deletes %}
deletes_source_data as (
select
*,
{{ strategy.unique_key }} as dbt_unique_key
from snapshot_query
),
{% endif %}
insertions as (
select
'insert' as dbt_change_type,
source_data.*
from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and (
{{ strategy.row_changed }}
)
)
),
updates as (
select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.dbt_scd_id
from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where (
{{ strategy.row_changed }}
)
)
{%- if strategy.invalidate_hard_deletes -%}
,
deletes as (
select
'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id
from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key is null
)
{%- endif %}
select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
union all
select * from deletes
{%- endif %}
{% endmacro %}
@9werayut
Copy link

9werayut commented May 16, 2022

Hi @jeremyyeo
I have a question about snapshot in dimansion scd type 2. The first data load it works. But the problem is caused by the second loading. that has changed the data. For example, I changed the data. localfirstname and modifiendat where _id = '5f2965bd8fac2ee61b6d6cac' data is valid according to scd type2 principle, but id = '5f2965ba8fac2ee61b69e3c0' is also valid_to, can you suggest a fix?

image

@jeremyyeo
Copy link
Author

jeremyyeo commented May 30, 2022

BigQuery Testing (dbt 1.0.7)

Macro overrides:

-- macros/overrides.sql
{% macro build_snapshot_table(strategy, sql) %}

    select *,
        {{ strategy.scd_id }} as dbt_scd_id,
        {{ strategy.updated_at }} as dbt_updated_at,
        cast(parse_date('%Y-%m-%d', '{{ var("my_date") }}') as timestamp) as dbt_valid_from,
        nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
    from (
        {{ sql }}
    ) sbq

{% endmacro %}


{% macro snapshot_staging_table(strategy, source_sql, target_relation) %}

    with snapshot_query as (

        {{ source_sql }}

    ),

    snapshotted_data as (

        select *,
            {{ strategy.unique_key }} as dbt_unique_key

        from {{ target_relation }}
        where dbt_valid_to is null

    ),

    insertions_source_data as (

        select
            *,
            {{ strategy.unique_key }} as dbt_unique_key,
            {{ strategy.updated_at }} as dbt_updated_at,
            cast(parse_date('%Y-%m-%d', '{{ var("my_date") }}') as timestamp) as dbt_valid_from,
            nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
            {{ strategy.scd_id }} as dbt_scd_id

        from snapshot_query
    ),

    updates_source_data as (

        select
            *,
            {{ strategy.unique_key }} as dbt_unique_key,
            {{ strategy.updated_at }} as dbt_updated_at,
            cast(parse_date('%Y-%m-%d', '{{ var("my_date") }}') as timestamp) as dbt_valid_from,
            cast(parse_date('%Y-%m-%d', '{{ var("my_date") }}') as timestamp) as dbt_valid_to

        from snapshot_query
    ),

    {%- if strategy.invalidate_hard_deletes %}

    deletes_source_data as (

        select 
            *,
            {{ strategy.unique_key }} as dbt_unique_key
        from snapshot_query
    ),
    {% endif %}

    insertions as (

        select
            'insert' as dbt_change_type,
            source_data.*

        from insertions_source_data as source_data
        left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where snapshotted_data.dbt_unique_key is null
           or (
                snapshotted_data.dbt_unique_key is not null
            and (
                {{ strategy.row_changed }}
            )
        )

    ),

    updates as (

        select
            'update' as dbt_change_type,
            source_data.*,
            snapshotted_data.dbt_scd_id

        from updates_source_data as source_data
        join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where (
            {{ strategy.row_changed }}
        )
    )

    {%- if strategy.invalidate_hard_deletes -%}
    ,

    deletes as (
    
        select
            'delete' as dbt_change_type,
            source_data.*,
            {{ snapshot_get_time() }} as dbt_valid_from,
            {{ snapshot_get_time() }} as dbt_updated_at,
            {{ snapshot_get_time() }} as dbt_valid_to,
            snapshotted_data.dbt_scd_id
    
        from snapshotted_data
        left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where source_data.dbt_unique_key is null
    )
    {%- endif %}

    select * from insertions
    union all
    select * from updates
    {%- if strategy.invalidate_hard_deletes %}
    union all
    select * from deletes
    {%- endif %}

{% endmacro %}

Steps:

  1. Add an initial snapshot:
-- snapshots/snappy.sql

{% snapshot snappy %}

{{
    config(
        target_database='cse-sandbox-319708',
        target_schema='dbt_jyeo',
        unique_key='user_id',
        strategy='check',
        check_cols='all'
    )
}}

select 1 as user_id, 'active' as status, 123 as price
union all
select 2 as user_id, 'inactive' as status, 123 as price

{% endsnapshot %}
  1. Run dbt snapshot --vars 'my_date: 1970-01-01' and check the output:

image

  1. Tweak the snapshot:
-- snapshots/snappy.sql

{% snapshot snappy %}

{{
    config(
        target_database='cse-sandbox-319708',
        target_schema='dbt_jyeo',
        unique_key='user_id',
        strategy='check',
        check_cols='all'
    )
}}

select 1 as user_id, 'inactive' as status, 123 as price
union all
select 2 as user_id, 'inactive' as status, 123 as price

{% endsnapshot %}
  1. Run dbt snapshot --vars 'my_date: 1970-02-01' and check the output:

image

@jeremyyeo
Copy link
Author

jeremyyeo commented May 30, 2022

Hi @jeremyyeo I have a question about snapshot in dimansion scd type 2. The first data load it works. But the problem is caused by the second loading. that has changed the data. For example, I changed the data. localfirstname and modifiendat where _id = '5f2965bd8fac2ee61b6d6cac' data is valid according to scd type2 principle, but id = '5f2965ba8fac2ee61b69e3c0' is also valid_to, can you suggest a fix?

image

Hey @9werayut , I added a BQ example above. I can see that it is working as expected (changed status for user_id = 1 only) where the row for user_id = 2's dbt_valid_to remained the same. I think you may have something else at play with your snapshot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment