Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Last active October 9, 2024 09:30
Show Gist options
  • Save jeremyyeo/67f07c06c4cc6943838e7262728e3f7a to your computer and use it in GitHub Desktop.
Save jeremyyeo/67f07c06c4cc6943838e7262728e3f7a to your computer and use it in GitHub Desktop.
Freshness checking models instead of sources #dbt

Freshness checking models instead of sources

dbt has a function to check the freshness (i.e. how up to date) of a particular source. That functionality is reserved exclusively for sources and not models - i.e. if you tried to apply source freshness on a model like you would a source:

# models/schemas.yml
version: 2
models:
  - name: foo
    loaded_at_field: updated_at
    freshness:
      error_after: { count: 48, period: hour }
...

You will find that neither dbt source freshness nor dbt test will actually do any "freshness checking" of our foo model above. One of the motivations for that is because in any dbt project, raw data normally will come in as sources - and then in your models you start transforming the data by selecting from those raw data sources - essentially if your source data is fresh - there is really no reason why you'd want to "freshness check" your models as well. Of course, given that dbt is basically just a jinja + sql compiler - the sky is the limit to what users usually attempt to do with it.

Now, while the built in source freshness checks are for sources only, dbt does however does provide the flexibility to an end user to create their own generic tests that they can then apply to models. Let's see if we can create a generic test that closely achieves the outcome of what source freshness checks do.

Since this is a more advanced topic - there's a couple of key things to understand here.

Below, I will be using "fail" and "error" interchangeably. This is opposed to "warn" which is different from "fail"/"error".

  1. By default, a dbt test will result in a fail if the test selection includes at least 1 record.
  2. Also by default, a dbt test will not emit a warning - it is always fail or not fail (i.e. pass).
  3. It is possible to configure the severity of a generic test - such that a test emits a warning if there are X records and fails / errors if there are Y records (https://docs.getdbt.com/reference/resource-configs/severity).
  4. Critically, it is important to understand that a normal generic test (the built in ones like not_null, unique, etc or one that a user writes themselves) is testing for the "number of rows" in order to emit a certain severity (fail/warn/pass). This is not the same as for the out of the box source freshness checks where it is not so much concerned with the "number of rows" but rather checking that the max timestamp of a certain column exceeds some error or warning threshold. All this is to say, the code paths that "decides" the status of a generic test is completely different to the code paths that "decides" the status of source freshness check (these are after all, not a "test" but it's completely own thing).
  5. To replicate a source freshness check - our generic test will need to support 3 types of freshness configs:
    • Only warn_after is configured.
    • Only error_after is configured.
    • Both warn_after and error_after is configured.

For the purposes of our replication, we will set it up so that:

  • The model freshness checks will fail if the timestamp exceeds 48 hours.
  • The model freshness checks will warn if the timestamp exceeds 12 hours.
  • The model freshess checks will pass otherwise.

This is tested on Snowflake only.

First, let's setup our dbt project.

# dbt_project.yml

name: my_dbt_project
profile: all
config-version: 2
version: "1.0.0"

models:
  my_dbt_project:
    +materialized: table

And these are the 7 models that we will be using to test all our conditions and combinations of conditions:

-- models/bar_should_fail_with_error_config.sql
select current_timestamp() - interval '60 hour' as updated_at, 1 as id

-- models/bar_should_pass_with_error_config.sql
select current_timestamp() - interval '36 hour' as updated_at, 1 as id

-- models/bar_should_pass_with_warn_config.sql
select current_timestamp() as updated_at, 1 as id

-- models/bar_should_warn_with_warn_config.sql
select current_timestamp() - interval '36 hour' as updated_at, 1 as id

-- models/foo_should_fail.sql
select current_timestamp() - interval '60 hour' as updated_at, 1 as id

-- models/foo_should_pass.sql
select current_timestamp() as updated_at, 1 as id

-- models/foo_should_warn.sql
select current_timestamp() - interval '36 hour' as updated_at, 1 as id

And the associated schema yml file where we will be applying our tests to:

# models/schemas.yml
version: 2
models:
  - name: bar_should_fail_with_error_config
    tests:
      - freshness:
          loaded_at_field: updated_at
          error_after: { count: 48, period: hour }
  - name: bar_should_pass_with_error_config
    tests:
      - freshness:
          loaded_at_field: updated_at
          error_after: { count: 48, period: hour }
  - name: bar_should_pass_with_warn_config
    tests:
      - freshness:
          loaded_at_field: updated_at
          warn_after: { count: 12, period: hour }
  - name: bar_should_warn_with_warn_config
    tests:
      - freshness:
          loaded_at_field: updated_at
          warn_after: { count: 12, period: hour }
  - name: foo_should_fail
    tests: &freshness_tests
      - freshness:
          loaded_at_field: updated_at
          warn_after: { count: 12, period: hour }
          error_after: { count: 48, period: hour }
  - name: foo_should_pass
    tests: *freshness_tests
  - name: foo_should_warn
    tests: *freshness_tests

The tests on foo_ models make use of yaml anchors to copy the test from one model to the next without having to rewrite them.

If you pay close attention to the above model code + schema yaml test declarations - you'll easily identify which model tests are supposed to result in which test status (fail/warn/pass). For example, the freshness test on the model bar_should_fail_with_error_config should error because the updated_at column is created with current_timestamp() - interval '60 hour' which exceeds the error_after: { count: 48, period: hour } config.

Now, here comes the "magic", the generic test macro itself:

-- tests/generic/freshness.sql
{% test freshness(model, loaded_at_field, warn_after=none, error_after=none) %}

{% if warn_after and error_after %}

    {{ config(warn_if='=1', error_if='=2') }}

    with check_conditions as (
        select count(*) as num_rows, max({{loaded_at_field}}) as last_loaded_at, 'warn' as condition 
          from {{ model }}
         where {{ loaded_at_field }} < (current_timestamp() - interval '{{ warn_after.count }} {{ warn_after.period }}')
         union
        select count(*) as num_rows, max({{loaded_at_field}}) as last_loaded_at, 'error' as condition 
          from {{ model }}
         where {{ loaded_at_field }} < (current_timestamp() - interval '{{ error_after.count }} {{ error_after.period }}')
    )

{% elif warn_after %}

    {{ config(warn_if='=1', error_if='=2') }}

    with check_conditions as (
        select count(*) as num_rows, max({{loaded_at_field}}) as last_loaded_at, 'warn' as condition 
          from {{ model }}
         where {{ loaded_at_field }} < (current_timestamp() - interval '{{ warn_after.count }} {{ warn_after.period }}')
    )

{% elif error_after %}

    {{ config(error_if='=1') }}

    with check_conditions as (
        select count(*) as num_rows, max({{loaded_at_field}}) as last_loaded_at, 'error' as condition 
          from {{ model }}
         where {{ loaded_at_field }} < (current_timestamp() - interval '{{ error_after.count }} {{ error_after.period }}')
    )
{% else %}
    {% do exceptions.raise_compiler_error('Freshness test applied to model ' ~ model ~ 'but missing warn_after/error_after configs.') %}
{% endif %}

select * from check_conditions
 where num_rows > 0

{% endtest %}

The general idea here is that, for test that have both warn_after and error_after configured, if a timestamp column exceeds the error_after config - then it surely should also exceed any warn_after configs. This means that:

  • If the test exceeds the error_after threshold - our test selection returns 2 rows.
  • If the test exceeds only warn_after threshold - our test selection returns 1 row.
  • And 0 rows otherwise.

Note that we make use of the warn_if / error_if test configurations as well in order to emit the right status (fail/warn/pass).

Let's now build our models and run our tests and see the outcome.

$ dbt build
09:59:51  Running with dbt=1.5.8
09:59:52  Registered adapter: snowflake=1.5.5
09:59:52  Found 7 models, 7 tests, 0 snapshots, 0 analyses, 326 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
09:59:52  
09:59:56  Concurrency: 1 threads (target='sf')
09:59:56  
09:59:56  1 of 14 START sql table model dbt_jyeo.bar_should_fail_with_error_config ....... [RUN]
09:59:58  1 of 14 OK created sql table model dbt_jyeo.bar_should_fail_with_error_config .. [SUCCESS 1 in 2.50s]
09:59:58  2 of 14 START sql table model dbt_jyeo.bar_should_pass_with_error_config ....... [RUN]
10:00:00  2 of 14 OK created sql table model dbt_jyeo.bar_should_pass_with_error_config .. [SUCCESS 1 in 2.00s]
10:00:00  3 of 14 START sql table model dbt_jyeo.bar_should_pass_with_warn_config ........ [RUN]
10:00:02  3 of 14 OK created sql table model dbt_jyeo.bar_should_pass_with_warn_config ... [SUCCESS 1 in 1.79s]
10:00:02  4 of 14 START sql table model dbt_jyeo.bar_should_warn_with_warn_config ........ [RUN]
10:00:04  4 of 14 OK created sql table model dbt_jyeo.bar_should_warn_with_warn_config ... [SUCCESS 1 in 2.18s]
10:00:04  5 of 14 START sql table model dbt_jyeo.foo_should_fail ......................... [RUN]
10:00:06  5 of 14 OK created sql table model dbt_jyeo.foo_should_fail .................... [SUCCESS 1 in 1.88s]
10:00:06  6 of 14 START sql table model dbt_jyeo.foo_should_pass ......................... [RUN]
10:00:08  6 of 14 OK created sql table model dbt_jyeo.foo_should_pass .................... [SUCCESS 1 in 2.08s]
10:00:08  7 of 14 START sql table model dbt_jyeo.foo_should_warn ......................... [RUN]
10:00:10  7 of 14 OK created sql table model dbt_jyeo.foo_should_warn .................... [SUCCESS 1 in 1.81s]
10:00:10  8 of 14 START test freshness_bar_should_fail_with_error_config_48__hour__updated_at  [RUN]
10:00:12  8 of 14 FAIL 1 freshness_bar_should_fail_with_error_config_48__hour__updated_at  [FAIL 1 in 1.65s]
10:00:12  9 of 14 START test freshness_bar_should_pass_with_error_config_48__hour__updated_at  [RUN]
10:00:13  9 of 14 PASS freshness_bar_should_pass_with_error_config_48__hour__updated_at .. [PASS in 1.59s]
10:00:13  10 of 14 START test freshness_bar_should_pass_with_warn_config_updated_at__12__hour  [RUN]
10:00:15  10 of 14 PASS freshness_bar_should_pass_with_warn_config_updated_at__12__hour .. [PASS in 1.45s]
10:00:15  11 of 14 START test freshness_bar_should_warn_with_warn_config_updated_at__12__hour  [RUN]
10:00:16  11 of 14 WARN 1 freshness_bar_should_warn_with_warn_config_updated_at__12__hour  [WARN 1 in 1.56s]
10:00:16  12 of 14 START test freshness_foo_should_fail_48__hour__updated_at__12__hour ... [RUN]
10:00:18  12 of 14 FAIL 2 freshness_foo_should_fail_48__hour__updated_at__12__hour ....... [FAIL 2 in 1.51s]
10:00:18  13 of 14 START test freshness_foo_should_pass_48__hour__updated_at__12__hour ... [RUN]
10:00:20  13 of 14 PASS freshness_foo_should_pass_48__hour__updated_at__12__hour ......... [PASS in 1.59s]
10:00:20  14 of 14 START test freshness_foo_should_warn_48__hour__updated_at__12__hour ... [RUN]
10:00:21  14 of 14 WARN 1 freshness_foo_should_warn_48__hour__updated_at__12__hour ....... [WARN 1 in 1.62s]
10:00:21  
10:00:21  Finished running 7 table models, 7 tests in 0 hours 0 minutes and 28.64 seconds (28.64s).
10:00:21  
10:00:21  Completed with 2 errors and 2 warnings:
10:00:21  
10:00:21  Failure in test freshness_bar_should_fail_with_error_config_48__hour__updated_at (models/schemas.yml)
10:00:21    Got 1 result, configured to fail if =1
10:00:21  
10:00:21    compiled Code at target/compiled/my_dbt_project/models/schemas.yml/freshness_bar_should_fail_with_186fbb3725139ea8c0ad8614bc48f256.sql
10:00:21  
10:00:21  Failure in test freshness_foo_should_fail_48__hour__updated_at__12__hour (models/schemas.yml)
10:00:21    Got 2 results, configured to fail if =2
10:00:21  
10:00:21    compiled Code at target/compiled/my_dbt_project/models/schemas.yml/freshness_foo_should_fail_48__hour__updated_at__12__hour.sql
10:00:21  
10:00:21  Warning in test freshness_bar_should_warn_with_warn_config_updated_at__12__hour (models/schemas.yml)
10:00:21  Got 1 result, configured to warn if =1
10:00:21  
10:00:21    compiled Code at target/compiled/my_dbt_project/models/schemas.yml/freshness_bar_should_warn_with_warn_config_updated_at__12__hour.sql
10:00:21  
10:00:21  Warning in test freshness_foo_should_warn_48__hour__updated_at__12__hour (models/schemas.yml)
10:00:21  Got 1 result, configured to warn if =1
10:00:21  
10:00:21    compiled Code at target/compiled/my_dbt_project/models/schemas.yml/freshness_foo_should_warn_48__hour__updated_at__12__hour.sql
10:00:21  
10:00:21  Done. PASS=10 WARN=2 ERROR=2 SKIP=0 TOTAL=14

As we can see:

  • Any test that we wanted to have failed, has failed.
  • Any test that we wanted to have warned, has warned.
  • And any test that we wanted to have passed, has passed.
@alecburnett
Copy link

alecburnett commented Oct 9, 2024

This doesn't work correctly.

The where clause is on the loaded_at_field - not the max loaded at field. It needs to be on the last_loaded_at.

For example your test is returning the wrong date:
image

When the max date in the table is a lot higher.
image

@alecburnett
Copy link

Appreciate this is a bit more verbose - but works how I would expect it to:

-- tests/generic/freshness.sql
-- https://gist.github.com/jeremyyeo - modified to work correctly.
{% test freshness(model, loaded_at_field, warn_after=none, error_after=none) %}

{% if warn_after and error_after %}

    {{ config(warn_if='=1', error_if='=2') }}

    with warn_check_conditions as (

        select 
            count(*) as num_rows, 
            max({{loaded_at_field}}) as last_loaded_at, 
            'warn' as condition 

        from {{ model }}
        where {{ loaded_at_field }} < (current_timestamp() - interval '{{ warn_after.count }} {{ warn_after.period }}')

    ), error_check_conditions as (

        select 
            count(*) as num_rows,
            max({{loaded_at_field}}) as last_loaded_at, 
            'error' as condition 

        from {{ model }}
        where last_loaded_at < (current_timestamp() - interval '{{ error_after.count }} {{ error_after.period }}')

    ), filter_condition as (

        select * from warn_check_conditions 
        where last_loaded_at < (current_timestamp() - interval '{{ warn_after.count }} {{ warn_after.period }}')

            union 

        select * from warn_check_conditions 
        where last_loaded_at < (current_timestamp() - interval '{{ error_after.count }} {{ error_after.period }}')

    )

{% elif warn_after %}

    {{ config(warn_if='=1', error_if='=2') }}

    with check_conditions as (

        select 
            count(*) as num_rows, 
            max({{loaded_at_field}}) as last_loaded_at, 
            'warn' as condition 

          from {{ model }}

     ), filter_condition as (

        select * from check_conditions 
        where last_loaded_at < (current_timestamp() - interval '{{ warn_after.count }} {{ warn_after.period }}')

     )

{% elif error_after %}

    {{ config(error_if='=1') }}

    with check_conditions as (

        select 
            count(*) as num_rows, 
            max({{loaded_at_field}}) as last_loaded_at, 
            'error' as condition 

          from {{ model }}

    ), filter_condition as (

        select * from check_conditions 
        where last_loaded_at < (current_timestamp() - interval '{{ error_after.count }} {{ error_after.period }}')
    )

{% else %}
    {% do exceptions.raise_compiler_error('Freshness test applied to model ' ~ model ~ 'but missing warn_after/error_after configs.') %}
{% endif %}

select * from filter_condition
where num_rows > 0

{% endtest %}

@alecburnett
Copy link

Thanks for posting this though btw - very helpful for applying source freshness on Snowflake stage sources for S3 - couldn't seem to get dbt source freshness command to work for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment