# dbt_project.yml
name: my_dbt_project
config-version: 2
version: '1.0.0'
models:
my_dbt_project:
+materialized: table
on-run-end: "{{ check_test_model(results, graph) }}"
# models/schema.yml
version: 2
models:
- name: foo
columns:
- name: id
tests:
- not_null
-- models/foo.sql
select 1 id
-- macros/check_test_model.sql
{% macro check_test_model(results, graph) %}
{% if execute %}
{# Get model metadata from the graph object. #}
{% set model_metadata = ({}) %}
{% for n in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %}.
{% do model_metadata.update({n.unique_id: {"model_database": n.database, "model_schema": n.schema}}) %}
{% endfor %}
{# Get tests from the results object and lookup a models metadata. #}
{% for r in results %}
{% set line %}
>>>>> Test {{ r.node.unique_id }} had a status of {{ r.status }}.
>>>>> Test {{ r.node.unique_id }} attached to model {{ r.node.attached_node }} (database: {{ model_metadata[r.node.attached_node]["model_database"] }}, schema:{{ model_metadata[r.node.attached_node]["model_schema"] }}).
{% endset %}
{% do log(line, True) %}
{% endfor %}
{% endif %}
{% endmacro %}
$ dbt test
10:10:41 Running with dbt=1.5.4
10:10:42 Registered adapter: snowflake=1.5.3
10:10:45 Found 1 model, 1 test, 0 snapshots, 0 analyses, 754 macros, 1 operation, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
10:10:45
10:10:48 Concurrency: 1 threads (target='sf')
10:10:48
10:10:48 1 of 1 START test not_null_foo_id .............................................. [RUN]
10:10:50 1 of 1 PASS not_null_foo_id .................................................... [PASS in 2.54s]
10:10:50
10:10:50 Running 1 on-run-end hook
10:10:50
>>>>> Test test.my_dbt_project.not_null_foo_id.f099b1e59c had a status of pass.
>>>>> Test test.my_dbt_project.not_null_foo_id.f099b1e59c attached to model model.my_dbt_project.foo (database: development, schema:dbt_jyeo).
10:10:50 1 of 1 START hook: my_dbt_project.on-run-end.0 ................................. [RUN]
10:10:50 1 of 1 OK hook: my_dbt_project.on-run-end.0 .................................... [OK in 0.00s]
10:10:50
10:10:50
10:10:50 Finished running 1 test, 1 hook in 0 hours 0 minutes and 5.66 seconds (5.66s).
10:10:50
10:10:50 Completed successfully
10:10:50
10:10:50 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
# dbt_project.yml
name: my_dbt_project
config-version: 2
version: '1.0.0'
on-run-end: "{{ check_test_source(results, graph) }}"
# models/sources.yml
version: 2
sources:
- name: dbt_jyeo
tables:
- name: my_source
columns:
- name: c
tests:
- not_null
-- macros/check_test_source.sql
{% macro check_test_source(results, graph) %}
{% if execute %}
{# Get source metadata from the graph object. #}
{% set source_metadata = ({}) %}
{% for s in graph.sources.values() %}
{% do source_metadata.update({s.unique_id: {"source_fqn": s.relation_name}}) %}
{% endfor %}
{# Get tests from the results object and lookup a sources metadata. #}
{% for r in results %}
{% set line %}
>>>>> Test {{ r.node.unique_id }} had a status of {{ r.status }}.
>>>>> Test {{ r.node.unique_id }} attached to source {{ r.node.depends_on.nodes[0] }} (fqn: {{ source_metadata[r.node.depends_on.nodes[0]]["source_fqn"] }}).
{% endset %}
{% do log(line, True) %}
{% endfor %}
{% endif %}
{% endmacro %}
$ dbt test
10:27:49 Running with dbt=1.5.4
10:27:50 Registered adapter: snowflake=1.5.3
10:27:52 Found 0 models, 1 test, 0 snapshots, 0 analyses, 754 macros, 1 operation, 0 seed files, 1 source, 0 exposures, 0 metrics, 0 groups
10:27:52
10:27:52 Concurrency: 1 threads (target='sf')
10:27:52
10:27:52 1 of 1 START test source_not_null_dbt_jyeo_my_source_c ......................... [RUN]
10:27:55 1 of 1 PASS source_not_null_dbt_jyeo_my_source_c ............................... [PASS in 2.97s]
10:27:55
10:27:55 Running 1 on-run-end hook
10:27:55
>>>>> Test test.my_dbt_project.source_not_null_dbt_jyeo_my_source_c.b87b21a427 had a status of pass.
>>>>> Test test.my_dbt_project.source_not_null_dbt_jyeo_my_source_c.b87b21a427 attached to source source.my_dbt_project.dbt_jyeo.my_source (fqn: development.dbt_jyeo.my_source).
10:27:55 1 of 1 START hook: my_dbt_project.on-run-end.0 ................................. [RUN]
10:27:55 1 of 1 OK hook: my_dbt_project.on-run-end.0 .................................... [OK in 0.00s]
10:27:55
10:27:55
10:27:55 Finished running 1 test, 1 hook in 0 hours 0 minutes and 3.01 seconds (3.01s).
10:27:55
10:27:55 Completed successfully
10:27:55
10:27:55 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
An astute reader pointed out that r.node.depends_on.nodes[0]
doesn't quite work as expected when the source tests also contain a relationships test because the r.node.depends_on.nodes
can contain multiple "nodes". This is a second take at doing this.
# dbt_project.yml
name: my_dbt_project
profile: all
config-version: 2
version: 1.0
models:
my_dbt_project:
+materialized: table
on-run-end: "{{ check_test_source(results, graph) }}"
# models/sources.yml
version: 2
sources:
- name: raw
schema: dbt_jyeo
tables:
- name: my_source_a
columns:
- name: c
tests:
- not_null
- relationships:
to: ref('foo')
field: id
- name: my_source_b
columns:
- name: c
tests:
- unique
- relationships:
to: source('raw', 'my_source_a')
field: c
-- models/foo.sql
select 'foo' id
union
select 'bar'
-- macros/check_test_source.sql
{% macro check_test_source(results, graph) %}
{% if execute %}
{% set re = modules.re %}
{# Get source metadata from the graph object. #}
{% set source_metadata = ({}) %}
{% for s in graph.sources.values() %}
{% set source_identifier = s.source_name ~ "." ~ s.name %}
{% do source_metadata.update({source_identifier: {"source_fqn": s.relation_name}}) %}
{% endfor %}
{# Get tests from the results object and lookup a sources metadata. #}
{% for r in results %}
{# Convert "{{ get_where_subquery(source('raw', 'my_source_b')) }}" to "raw.my_source_b" #}
{% set test_source_caller = r.node.test_metadata.kwargs.model %}
{% set test_source_caller = re.search("source\((.*)\)\)", test_source_caller).group(1) | replace("'", "") | replace(", ", ".") %}
{% set line %}
>>>>> Test {{ r.node.unique_id }} had a status of {{ r.status }}.
>>>>> Test {{ r.node.unique_id }} attached to source {{ test_source_caller }} (fqn: {{ source_metadata[test_source_caller]["source_fqn"] }}).
{% endset %}
{% do log(line, True) %}
{% endfor %}
{% endif %}
{% endmacro %}
09:03:42 Running with dbt=1.5.4
09:03:43 Registered adapter: snowflake=1.5.3
09:03:44 Found 1 model, 4 tests, 0 snapshots, 0 analyses, 819 macros, 1 operation, 0 seed files, 2 sources, 0 exposures, 0 metrics, 0 groups
09:03:44
09:03:46 Concurrency: 1 threads (target='sf')
09:03:46
09:03:46 1 of 4 START test source_not_null_raw_my_source_a_c ............................ [RUN]
09:03:49 1 of 4 PASS source_not_null_raw_my_source_a_c .................................. [PASS in 2.43s]
09:03:49 2 of 4 START test source_relationships_raw_my_source_a_c__id__ref_foo_ ......... [RUN]
09:03:51 2 of 4 PASS source_relationships_raw_my_source_a_c__id__ref_foo_ ............... [PASS in 2.38s]
09:03:51 3 of 4 START test source_relationships_raw_my_source_b_c__c__source_raw_my_source_a_ [RUN]
09:03:54 3 of 4 PASS source_relationships_raw_my_source_b_c__c__source_raw_my_source_a_ . [PASS in 2.42s]
09:03:54 4 of 4 START test source_unique_raw_my_source_b_c .............................. [RUN]
09:03:56 4 of 4 PASS source_unique_raw_my_source_b_c .................................... [PASS in 2.29s]
09:03:56
09:03:56 Running 1 on-run-end hook
09:03:56
>>>>> Test test.my_dbt_project.source_not_null_raw_my_source_a_c.834a7e0326 had a status of pass.
>>>>> Test test.my_dbt_project.source_not_null_raw_my_source_a_c.834a7e0326 attached to source raw.my_source_a (fqn: development.dbt_jyeo.my_source_a).
09:03:56
>>>>> Test test.my_dbt_project.source_relationships_raw_my_source_a_c__id__ref_foo_.3f86380dba had a status of pass.
>>>>> Test test.my_dbt_project.source_relationships_raw_my_source_a_c__id__ref_foo_.3f86380dba attached to source raw.my_source_a (fqn: development.dbt_jyeo.my_source_a).
09:03:56
>>>>> Test test.my_dbt_project.source_relationships_raw_my_source_b_c__c__source_raw_my_source_a_.e51fe7a5e0 had a status of pass.
>>>>> Test test.my_dbt_project.source_relationships_raw_my_source_b_c__c__source_raw_my_source_a_.e51fe7a5e0 attached to source raw.my_source_b (fqn: development.dbt_jyeo.my_source_b).
09:03:56
>>>>> Test test.my_dbt_project.source_unique_raw_my_source_b_c.9e34d6c1f7 had a status of pass.
>>>>> Test test.my_dbt_project.source_unique_raw_my_source_b_c.9e34d6c1f7 attached to source raw.my_source_b (fqn: development.dbt_jyeo.my_source_b).
09:03:56 1 of 1 START hook: my_dbt_project.on-run-end.0 ................................. [RUN]
09:03:56 1 of 1 OK hook: my_dbt_project.on-run-end.0 .................................... [OK in 0.00s]
09:03:56
09:03:56
09:03:56 Finished running 4 tests, 1 hook in 0 hours 0 minutes and 12.40 seconds (12.40s).
09:03:56
09:03:56 Completed successfully
09:03:56
09:03:56 Done. PASS=4 WARN=0 ERROR=0 SKIP=0 TOTAL=4
After doing some research, there appear to be other helpful keys - like r.node.sources
. Unfortunately, I found that that appeared to be an ordered list of sources. This means that if our above source-to-source relationships test was:
# models/sources.yml
version: 2
sources:
- name: raw
schema: dbt_jyeo
tables:
- name: my_source_a
- name: my_source_b
columns:
- name: c
tests:
- relationships:
to: source('raw', 'my_source_a')
field: c
OR
# models/sources.yml
version: 2
sources:
- name: raw
schema: dbt_jyeo
tables:
- name: my_source_a
columns:
- name: c
tests:
- relationships:
to: source('raw', 'my_source_b')
field: c
- name: my_source_b
Then r.node.sources
would be [['raw', 'my_source_a'], ['raw', 'my_source_b']]
in either case.
The approach I took above is to use r.node.test_metadata.kwargs.model
instead. I found it to contain the "caller" source - i.e. {{ get_where_subquery(source('raw', 'my_source_b')) }}
or {{ get_where_subquery(source('raw', 'my_source_a')) }}
appropriately. And then we just use some regex to tidy it up.