Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Last active August 31, 2023 10:37
Show Gist options
  • Save jeremyyeo/83adf1f412e5e497baef60e5ada35bf8 to your computer and use it in GitHub Desktop.
Save jeremyyeo/83adf1f412e5e497baef60e5ada35bf8 to your computer and use it in GitHub Desktop.
What keys to use from results or graph to associate a test with a node (model/source) #dbt

What keys to use from results or graph to associate a test with a node (model/source)

Associating a model with its test

# dbt_project.yml
name: my_dbt_project
config-version: 2
version: '1.0.0'

models:
  my_dbt_project:
    +materialized: table

on-run-end: "{{ check_test_model(results, graph) }}"

# models/schema.yml
version: 2
models:
  - name: foo
    columns:
      - name: id
        tests:
          - not_null
-- models/foo.sql
select 1 id

-- macros/check_test_model.sql
{% macro check_test_model(results, graph) %}
  {% if execute %}

    {# Get model metadata from the graph object. #}
    {% set model_metadata = ({}) %}
    {% for n in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %}.
      {% do model_metadata.update({n.unique_id: {"model_database": n.database, "model_schema": n.schema}}) %}
    {% endfor %}


    {# Get tests from the results object and lookup a models metadata. #}
    {% for r in results %}
      {% set line %}
          >>>>> Test {{ r.node.unique_id }} had a status of {{ r.status }}.
          >>>>> Test {{ r.node.unique_id }} attached to model {{ r.node.attached_node }} (database: {{ model_metadata[r.node.attached_node]["model_database"] }}, schema:{{ model_metadata[r.node.attached_node]["model_schema"] }}).
      {% endset %}
      {% do log(line, True) %}
    {% endfor %}

  {% endif %}
{% endmacro %}
$ dbt test
10:10:41  Running with dbt=1.5.4
10:10:42  Registered adapter: snowflake=1.5.3
10:10:45  Found 1 model, 1 test, 0 snapshots, 0 analyses, 754 macros, 1 operation, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
10:10:45  
10:10:48  Concurrency: 1 threads (target='sf')
10:10:48  
10:10:48  1 of 1 START test not_null_foo_id .............................................. [RUN]
10:10:50  1 of 1 PASS not_null_foo_id .................................................... [PASS in 2.54s]
10:10:50  
10:10:50  Running 1 on-run-end hook
10:10:50  
          >>>>> Test test.my_dbt_project.not_null_foo_id.f099b1e59c had a status of pass.
          >>>>> Test test.my_dbt_project.not_null_foo_id.f099b1e59c attached to model model.my_dbt_project.foo (database: development, schema:dbt_jyeo).
      
10:10:50  1 of 1 START hook: my_dbt_project.on-run-end.0 ................................. [RUN]
10:10:50  1 of 1 OK hook: my_dbt_project.on-run-end.0 .................................... [OK in 0.00s]
10:10:50  
10:10:50  
10:10:50  Finished running 1 test, 1 hook in 0 hours 0 minutes and 5.66 seconds (5.66s).
10:10:50  
10:10:50  Completed successfully
10:10:50  
10:10:50  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

Associating a source with its test

# dbt_project.yml
name: my_dbt_project
config-version: 2
version: '1.0.0'

on-run-end: "{{ check_test_source(results, graph) }}"

# models/sources.yml
version: 2
sources:
  - name: dbt_jyeo
    tables:
      - name: my_source
        columns:
          - name: c
            tests:
              - not_null
-- macros/check_test_source.sql
{% macro check_test_source(results, graph) %}
  {% if execute %}

    {# Get source metadata from the graph object. #}
    {% set source_metadata = ({}) %}
    {% for s in graph.sources.values() %}
      {% do source_metadata.update({s.unique_id: {"source_fqn": s.relation_name}}) %}
    {% endfor %}

    {# Get tests from the results object and lookup a sources metadata.  #}
    {% for r in results %}
      {% set line %}
          >>>>> Test {{ r.node.unique_id }} had a status of {{ r.status }}.
          >>>>> Test {{ r.node.unique_id }} attached to source {{ r.node.depends_on.nodes[0] }} (fqn: {{ source_metadata[r.node.depends_on.nodes[0]]["source_fqn"] }}).
      {% endset %}
      {% do log(line, True) %}
    {% endfor %}

  {% endif %}
{% endmacro %}
$ dbt test

10:27:49  Running with dbt=1.5.4
10:27:50  Registered adapter: snowflake=1.5.3
10:27:52  Found 0 models, 1 test, 0 snapshots, 0 analyses, 754 macros, 1 operation, 0 seed files, 1 source, 0 exposures, 0 metrics, 0 groups
10:27:52  
10:27:52  Concurrency: 1 threads (target='sf')
10:27:52  
10:27:52  1 of 1 START test source_not_null_dbt_jyeo_my_source_c ......................... [RUN]
10:27:55  1 of 1 PASS source_not_null_dbt_jyeo_my_source_c ............................... [PASS in 2.97s]
10:27:55  
10:27:55  Running 1 on-run-end hook
10:27:55  
          >>>>> Test test.my_dbt_project.source_not_null_dbt_jyeo_my_source_c.b87b21a427 had a status of pass.
          >>>>> Test test.my_dbt_project.source_not_null_dbt_jyeo_my_source_c.b87b21a427 attached to source source.my_dbt_project.dbt_jyeo.my_source (fqn: development.dbt_jyeo.my_source).
      
10:27:55  1 of 1 START hook: my_dbt_project.on-run-end.0 ................................. [RUN]
10:27:55  1 of 1 OK hook: my_dbt_project.on-run-end.0 .................................... [OK in 0.00s]
10:27:55  
10:27:55  
10:27:55  Finished running 1 test, 1 hook in 0 hours 0 minutes and 3.01 seconds (3.01s).
10:27:55  
10:27:55  Completed successfully
10:27:55  
10:27:55  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

Associating a source with its test v2

An astute reader pointed out that r.node.depends_on.nodes[0] doesn't quite work as expected when the source tests also contain a relationships test because the r.node.depends_on.nodes can contain multiple "nodes". This is a second take at doing this.

# dbt_project.yml
name: my_dbt_project
profile: all
config-version: 2
version: 1.0

models:
  my_dbt_project:
    +materialized: table

on-run-end: "{{ check_test_source(results, graph) }}"


# models/sources.yml
version: 2
sources:
  - name: raw
    schema: dbt_jyeo
    tables:
      - name: my_source_a
        columns:
          - name: c
            tests:
              - not_null
              - relationships:
                  to: ref('foo')
                  field: id
      - name: my_source_b
        columns:
          - name: c
            tests:
              - unique
              - relationships:
                  to: source('raw', 'my_source_a')
                  field: c
-- models/foo.sql
select 'foo' id
 union
select 'bar' 

-- macros/check_test_source.sql
{% macro check_test_source(results, graph) %}
  {% if execute %}
    {% set re = modules.re %}

    {# Get source metadata from the graph object. #}
    {% set source_metadata = ({}) %}

    {% for s in graph.sources.values() %}
      {% set source_identifier = s.source_name ~ "."  ~ s.name %}
      {% do source_metadata.update({source_identifier: {"source_fqn": s.relation_name}}) %}
    {% endfor %}

    {# Get tests from the results object and lookup a sources metadata.  #}
    {% for r in results %}
      {# Convert "{{ get_where_subquery(source('raw', 'my_source_b')) }}" to "raw.my_source_b" #}
      {% set test_source_caller = r.node.test_metadata.kwargs.model %}
      {% set test_source_caller = re.search("source\((.*)\)\)", test_source_caller).group(1) | replace("'", "") | replace(", ", ".") %}
      
      {% set line %}
          >>>>> Test {{ r.node.unique_id }} had a status of {{ r.status }}.
          >>>>> Test {{ r.node.unique_id }} attached to source {{ test_source_caller }} (fqn: {{ source_metadata[test_source_caller]["source_fqn"] }}).
      {% endset %}
      {% do log(line, True) %}
    {% endfor %}

  {% endif %}
{% endmacro %}
09:03:42  Running with dbt=1.5.4
09:03:43  Registered adapter: snowflake=1.5.3
09:03:44  Found 1 model, 4 tests, 0 snapshots, 0 analyses, 819 macros, 1 operation, 0 seed files, 2 sources, 0 exposures, 0 metrics, 0 groups
09:03:44  
09:03:46  Concurrency: 1 threads (target='sf')
09:03:46  
09:03:46  1 of 4 START test source_not_null_raw_my_source_a_c ............................ [RUN]
09:03:49  1 of 4 PASS source_not_null_raw_my_source_a_c .................................. [PASS in 2.43s]
09:03:49  2 of 4 START test source_relationships_raw_my_source_a_c__id__ref_foo_ ......... [RUN]
09:03:51  2 of 4 PASS source_relationships_raw_my_source_a_c__id__ref_foo_ ............... [PASS in 2.38s]
09:03:51  3 of 4 START test source_relationships_raw_my_source_b_c__c__source_raw_my_source_a_  [RUN]
09:03:54  3 of 4 PASS source_relationships_raw_my_source_b_c__c__source_raw_my_source_a_ . [PASS in 2.42s]
09:03:54  4 of 4 START test source_unique_raw_my_source_b_c .............................. [RUN]
09:03:56  4 of 4 PASS source_unique_raw_my_source_b_c .................................... [PASS in 2.29s]
09:03:56  
09:03:56  Running 1 on-run-end hook
09:03:56  
          >>>>> Test test.my_dbt_project.source_not_null_raw_my_source_a_c.834a7e0326 had a status of pass.
          >>>>> Test test.my_dbt_project.source_not_null_raw_my_source_a_c.834a7e0326 attached to source raw.my_source_a (fqn: development.dbt_jyeo.my_source_a).
      
09:03:56  
          >>>>> Test test.my_dbt_project.source_relationships_raw_my_source_a_c__id__ref_foo_.3f86380dba had a status of pass.
          >>>>> Test test.my_dbt_project.source_relationships_raw_my_source_a_c__id__ref_foo_.3f86380dba attached to source raw.my_source_a (fqn: development.dbt_jyeo.my_source_a).
      
09:03:56  
          >>>>> Test test.my_dbt_project.source_relationships_raw_my_source_b_c__c__source_raw_my_source_a_.e51fe7a5e0 had a status of pass.
          >>>>> Test test.my_dbt_project.source_relationships_raw_my_source_b_c__c__source_raw_my_source_a_.e51fe7a5e0 attached to source raw.my_source_b (fqn: development.dbt_jyeo.my_source_b).
      
09:03:56  
          >>>>> Test test.my_dbt_project.source_unique_raw_my_source_b_c.9e34d6c1f7 had a status of pass.
          >>>>> Test test.my_dbt_project.source_unique_raw_my_source_b_c.9e34d6c1f7 attached to source raw.my_source_b (fqn: development.dbt_jyeo.my_source_b).
      
09:03:56  1 of 1 START hook: my_dbt_project.on-run-end.0 ................................. [RUN]
09:03:56  1 of 1 OK hook: my_dbt_project.on-run-end.0 .................................... [OK in 0.00s]
09:03:56  
09:03:56  
09:03:56  Finished running 4 tests, 1 hook in 0 hours 0 minutes and 12.40 seconds (12.40s).
09:03:56  
09:03:56  Completed successfully
09:03:56  
09:03:56  Done. PASS=4 WARN=0 ERROR=0 SKIP=0 TOTAL=4

After doing some research, there appear to be other helpful keys - like r.node.sources. Unfortunately, I found that that appeared to be an ordered list of sources. This means that if our above source-to-source relationships test was:

# models/sources.yml
version: 2
sources:
  - name: raw
    schema: dbt_jyeo
    tables:
      - name: my_source_a
      - name: my_source_b
        columns:
          - name: c
            tests:
              - relationships:
                  to: source('raw', 'my_source_a')
                  field: c

OR

# models/sources.yml
version: 2
sources:
  - name: raw
    schema: dbt_jyeo
    tables:
      - name: my_source_a
        columns:
          - name: c
            tests:
              - relationships:
                  to: source('raw', 'my_source_b')
                  field: c
      - name: my_source_b

Then r.node.sources would be [['raw', 'my_source_a'], ['raw', 'my_source_b']] in either case.

The approach I took above is to use r.node.test_metadata.kwargs.model instead. I found it to contain the "caller" source - i.e. {{ get_where_subquery(source('raw', 'my_source_b')) }} or {{ get_where_subquery(source('raw', 'my_source_a')) }} appropriately. And then we just use some regex to tidy it up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment