Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Last active July 11, 2024 22:52
Show Gist options
  • Save jeremyyeo/7282a2e25d86fe8b449ed70e8cdf10ff to your computer and use it in GitHub Desktop.
Save jeremyyeo/7282a2e25d86fe8b449ed70e8cdf10ff to your computer and use it in GitHub Desktop.
Debugging dbt snapshots #dbt

Debugging dbt snapshots

Loom: https://www.loom.com/share/84f2ae5463fa48048b9c578244ceb440

Note: dbt's responsiblity is to generate the same DDL/DML everytime for the same dbt sql/jinja. dbt is not responsible for making sure your data is unique, it is not responsible for the shape of your data, etc - you yourself are responsible for that.

At a high level, what we're trying to do here is to:

  1. At the start of the run, make backups of the snapshot snappy and the data it is snapshotting - the model raw.
  2. Test that after the snapshot has completed - if the dbt_scd_id is still unique.
  3. If it passes, everything is good - so we can drop the backups we created in (1).
  4. If it fails, it means some duplicates has gotten into the snapshot somehow - here we leave the backups we created in (1) so that we can come back the next day and replay all the steps so as to figure out what rows caused the duplicate. Here we also create a backup copy of raw at the end of the run so that we have the exact copy of rawas of the time we used it to insert into snappy. (This is because we may have other processes or runs that try to modify raw before we had the time to do our debugging).
# dbt_project.yml
...
on-run-start: "{{ make_backups() }}"
on-run-end: "{{ check_results_drop_backups(results) }}"

# snapshots/snappy.yml
version: 2
snapshots:
  - name: snappy
    columns:
      - name: dbt_scd_id
        tests:
          - unique
-- snapshots/snappy.sql
{% snapshot snappy %}

{{
    config(
      target_schema='dbt_jyeo_snapshots',
      unique_key='id',
      strategy='check',
      check_cols=['user_name', 'user_status']
    )
}}

select * from {{ ref('raw') }}

{% endsnapshot %}

-- models/raw.sql
{{ config(materialized='table') }}

select 1 id, 'alice' as user_name, 'inactive' as user_status
-- macros/make_backups.sql
{% macro make_ts() %}
{% set now = run_started_at.strftime("%Y%m%d_%H%M%S") %}
{% do return(now) %}
{% endmacro %}
{% macro make_backups() %}
{% if execute and target.name == 'prod' %}
{% set rel = adapter.get_relation(database='development_jyeo', schema='dbt_jyeo', identifier='raw') %}
{% if rel %}
{% set query %}
create or replace table {{ rel.database | lower }}.{{ rel.schema | lower }}.{{ rel.identifier | lower }}__backup_start_{{ make_ts() }} as
select * from {{ rel.database | lower }}.{{ rel.schema | lower }}.{{ rel.identifier | lower }};
{% endset %}
{% do run_query(query) %}
{% endif %}
{% set rel = adapter.get_relation(database='development_jyeo', schema='dbt_jyeo_snapshots', identifier='snappy') %}
{% if rel %}
{% set query %}
create or replace table {{ rel.database | lower }}.{{ rel.schema | lower }}.{{ rel.identifier | lower }}__backup_start_{{ make_ts() }} as
select * from {{ rel.database | lower }}.{{ rel.schema | lower }}.{{ rel.identifier | lower }};
{% endset %}
{% do run_query(query) %}
{% endif %}
{% endif %}
{% endmacro %}
{% macro check_results_drop_backups(results) %}
{% if execute and target.name == 'prod' %}
{% for r in results %}
{#/* Filter results for the test of interest */#}
{#
/*
My snapshot name is 'snappy' so that's what I'm looking for in r.node.test_metadata.kwargs.model.
You will need to modify it to whatever your snapshot name is.
*/
#}
{% if r.node.resource_type == 'test' and r.node.test_metadata.name == 'unique' and 'snappy' in r.node.test_metadata.kwargs.model %}
{% if r.status == 'pass' %}
{% do log('Snapshot dbt_scd_id is unique - dropping backups.', True) %}
{% set rel = adapter.get_relation(database='development_jyeo', schema='dbt_jyeo', identifier='raw') %}
{% set drop_model_query %}
drop table if exists {{ rel.database | lower }}.{{ rel.schema | lower }}.{{ rel.identifier | lower }}__backup_start_{{ make_ts() }};
{% endset %}
{% do run_query(drop_model_query) %}
{% set rel = adapter.get_relation(database='development_jyeo', schema='dbt_jyeo_snapshots', identifier='snappy') %}
{% set drop_snapshot_query %}
drop table if exists {{ rel.database | lower }}.{{ rel.schema | lower }}.{{ rel.identifier | lower }}__backup_start_{{ make_ts() }};
{% endset %}
{% do run_query(drop_snapshot_query) %}
{% do log('Backups dropped.', True) %}
{% else %}
{% do log('Snapshot dbt_scd_id is not unique - backups left as is. Additionally creating another backup of raw.', True) %}
{% set rel = adapter.get_relation(database='development_jyeo', schema='dbt_jyeo', identifier='raw') %}
{% set create_model_query %}
create or replace table {{ rel.database | lower }}.{{ rel.schema | lower }}.{{ rel.identifier | lower }}__backup_end_{{ make_ts() }} as
select * from {{ rel.database | lower }}.{{ rel.schema | lower }}.{{ rel.identifier | lower }};
{% endset %}
{% do run_query(create_model_query) %}
{% set rel = adapter.get_relation(database='development_jyeo', schema='dbt_jyeo_snapshots', identifier='snappy') %}
{% set create_snapshot_query %}
create or replace table {{ rel.database | lower }}.{{ rel.schema | lower }}.{{ rel.identifier | lower }}__backup_end_{{ make_ts() }} as
select * from {{ rel.database | lower }}.{{ rel.schema | lower }}.{{ rel.identifier | lower }};
{% endset %}
{% do run_query(create_snapshot_query) %}
{% endif %}
{% endif %}
{% endfor %}
{% endif %}
{% endmacro %}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment