Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Last active July 26, 2023 12:32
Show Gist options
  • Save jeremyyeo/f83ca852510956ba3f2f96aa079c43d5 to your computer and use it in GitHub Desktop.
Save jeremyyeo/f83ca852510956ba3f2f96aa079c43d5 to your computer and use it in GitHub Desktop.
Filtering dbt's catalog query to only relations that are used in the project #dbt

Filtering dbt's catalog query to only relations that are used in the project

Updates

2022-11-23: Change catalog_2.sql to filter graph nodes for models only. Previously we were retrieving all nodes - this included tests. Added catalog_3.sql that can filter for specific model tags and only retrieve metadata for those.


A small workaround for dbt-labs/dbt-bigquery#115

If you have a BigQuery dbt project that:

  1. Uses a small number of sources from a dataset with many unrelated tables/views.
  2. Writes to a dataset with many other unrelated tables/views.

It is likely you will run into the above linked issue. This is because when dbt builds the catalog.json file, it has to iterate through each dataset in your dbt project AND retrieve all tables/views in it whether they are relevant to your dbt project or not.

Let's take a look at a quick example.

  1. Setup BigQuery.
create or replace table `cse-sandbox-319708.dbt_jyeo_src.my_source_a` as select 1 as user_id;
create or replace table `cse-sandbox-319708.dbt_jyeo_src.my_source_b` as select 2 as user_id;
create or replace table `cse-sandbox-319708.dbt_jyeo_src.my_source_c` as select 3 as user_id;

create or replace table `cse-sandbox-319708.dbt_jyeo.baz_19700101` as select 1 as user_id;
create or replace table `cse-sandbox-319708.dbt_jyeo.baz_19700102` as select 2 as user_id;
create or replace table `cse-sandbox-319708.dbt_jyeo.baz_19700103` as select 3 as user_id;

create or replace view `cse-sandbox-319708.dbt_jyeo.qux_19700101` as select 1 as user_id;
create or replace view `cse-sandbox-319708.dbt_jyeo.qux_19700102` as select 2 as user_id;
create or replace view `cse-sandbox-319708.dbt_jyeo.qux_19700103` as select 3 as user_id;

I made sure those datasets above were first empty and thus after running the statements above, those are all the relations that are contained within it. The dbt_jyeo dataset is where we will output our dbt models and the dbt_jyeo_src dataset will be used as a source in our project.

  1. Setup dbt project.
-- models/bar.sql
select 1 as user_id

-- models/foo.sql
select 1 as user_id

^ Make sure these write to the dbt_jyeo dataset.

# models/sources.yml
version: 2
sources:
  - name: dbt_jyeo_src
    tables:
      - name: my_source_a
  1. dbt run to build your models.

  2. Override the catalog macro and put it in your own project.

-- macros/catalog.sql
{% macro get_catalog(information_schema, schemas) -%}
  ...
{%- endmacro %

I've modified mine slightly (see catalog_1.sql attached to this gist) so I can print out some debugging information. Mainly, I want to log the query being used and also print out the number of rows returned by the query for each dataset.

  1. Generate the docs and see the output.
$ dbt docs generate
04:22:33  Running with dbt=1.2.0
04:22:35  Found 2 models, 0 tests, 0 snapshots, 0 analyses, 286 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
04:22:35  
04:22:38  Concurrency: 1 threads (target='dev')
04:22:38  
04:22:38  Done.
04:22:38  Building catalog
04:22:49  {'dbt_jyeo'} - rows returned: 6
04:22:58  {'dbt_jyeo_src'} - rows returned: 3
04:22:58  Catalog written to /Users/jeremy/src/dbt-sandcastles/dbt/bigquery/target/catalog.json

^ So we 2 models in our project and using 1 source but instead we have a total of 9 rows (most are unrelated to our dbt project).

  1. Go to our debug logs, and run the exact query dbt is running in BigQuery itself.

Screen Shot 2022-08-03 at 4 24 15 PM

The query for the dbt_jyeo dataset above is returning 6 rows when only 2 are relevant to our dbt project (same thing if you run the query that is for the dbt_jyeo_src dataset). 1 2

  1. Basically, the fix is as proposed by Jerco already in the issue - we should filter the query to only include relations of interest.

  2. Replace the overridden macro with the one we have in this gist (catalog_2.sql). This macro tweaks a couple of things:

  3. Let's regenerate our docs and look at the output with our new override.

$ dbt docs generate
04:41:28  Running with dbt=1.2.0
04:41:30  Found 2 models, 0 tests, 0 snapshots, 0 analyses, 286 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
04:41:30  
04:41:33  Concurrency: 1 threads (target='dev')
04:41:33  
04:41:33  Done.
04:41:33  Building catalog
04:41:44  {'dbt_jyeo'} - rows returned: 2
04:41:51  {'dbt_jyeo_src'} - rows returned: 1
04:41:51  Catalog written to /Users/jeremy/src/dbt-sandcastles/dbt/bigquery/target/catalog.json

^ We have significantly reduced the number of rows retrieved to build our catalog by only filtering on the ones that truly matter.

  1. Jerco notes that the limitation would be how big the actual list of relations are - if you have too many of them in your project, the query that dbt has to send to BigQuery could potentially exceed the limit of 1MB of text. Now I have tested this by doing something like (and removing the {%- set relations_in_project = set(relations_in_project) | list -%} line:
  {%- for n in range(33333) -%}
    {%- for node in graph.nodes.values() -%}
        {%- do relations_in_project.append(node.alias) -%}
    {%- endfor -%}
    {%- for source in graph.sources.values() -%}
        {%- do relations_in_project.append(source.name) -%}
    {%- endfor -%}
  {%- endfor -%}
  {% do log(relations_in_project | length, True) %}

3 x 33333 ~ 100k in the filtering and clause that we added and it ran just fine. When I tried range(100000) ~ 300k, I ran into the The query is too large. The maximum standard SQL query length is 1024.00K characters, including comments and white space characters. error. The balance here probably depends on how long your model names / source table names are here since in this example they are quite small (e.g. foo).

Adding a filter to narrow down only specific models.

Some projects may have got many thousands of intermediate models so perhaps we really only want to narrow down the catalog query to only certain "key" models. We can do that by utilizing tags.

Let's add a tag to one of our models:

-- models/bar.sql
{{ config(tags=['important']) }}
select 1 as user_id

-- models/foo.sql
select 1 as user_id

P.s. While I'm tagging via the models config block, you can indeed tag a whole folder of models in the dbt_project.yml file instead of doing it model per model file here.

And then setup a var in our dbt_project.yml:

# dbt_project.yml
...
vars:
  docs_include_tags: ['important']
...

Add our override macro catalog_3.sql.

Then let's build our docs:

$ dbt docs generate
21:47:59  Running with dbt=1.3.0
21:48:01  Found 2 models, 0 tests, 0 snapshots, 0 analyses, 320 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
21:48:01  
21:48:02  Concurrency: 1 threads (target='dev')
21:48:02  
21:48:03  Done.
21:48:03  Building catalog
21:48:09  {'dbt_jyeo'} - rows returned: 1
21:48:14  {'dbt_jyeo_src'} - rows returned: 1
21:48:14  Catalog written to /Users/jeremy/src/dbt-sandcastles/dbt/bigquery/target/catalog.json

One of the downsides here is that only the tagged model(s) will have metadata:

image

image

But perhaps that's a-okay.

Footnotes

  1. Ftr, it looks like if you have many hundreds and thousands of rows to be returned, the BigQuery UI can elegantly display this by pagination but because dbt has to "download all those rows" to the local machine, it can mean a really bad time (https://github.com/dbt-labs/dbt-bigquery/issues/115#issuecomment-1035304410).

  2. There should be 1 row per table-column (since all my toy tables only have 1 column, we only see 1 row per table here).

{% macro get_catalog(information_schema, schemas) -%}
{%- if (schemas | length) == 0 -%}
{# Hopefully nothing cares about the columns we return when there are no rows #}
{%- set query = "select 1 as id limit 0" -%}
{%- else -%}
{%- set query -%}
with tables as (
select
project_id as table_database,
dataset_id as table_schema,
table_id as original_table_name,
concat(project_id, '.', dataset_id, '.', table_id) as relation_id,
row_count,
size_bytes as size_bytes,
case
when type = 1 then 'table'
when type = 2 then 'view'
else 'external'
end as table_type,
REGEXP_CONTAINS(table_id, '^.+[0-9]{8}$') and coalesce(type, 0) = 1 as is_date_shard,
REGEXP_EXTRACT(table_id, '^(.+)[0-9]{8}$') as shard_base_name,
REGEXP_EXTRACT(table_id, '^.+([0-9]{8})$') as shard_name
from {{ information_schema.replace(information_schema_view='__TABLES__') }}
where (
{%- for schema in schemas -%}
upper(dataset_id) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
),
extracted as (
select *,
case
when is_date_shard then shard_base_name
else original_table_name
end as table_name
from tables
),
unsharded_tables as (
select
table_database,
table_schema,
table_name,
coalesce(table_type, 'external') as table_type,
is_date_shard,
struct(
min(shard_name) as shard_min,
max(shard_name) as shard_max,
count(*) as shard_count
) as table_shards,
sum(size_bytes) as size_bytes,
sum(row_count) as row_count,
max(relation_id) as relation_id
from extracted
group by 1,2,3,4,5
),
info_schema_columns as (
select
concat(table_catalog, '.', table_schema, '.', table_name) as relation_id,
table_catalog as table_database,
table_schema,
table_name,
-- use the "real" column name from the paths query below
column_name as base_column_name,
ordinal_position as column_index,
is_partitioning_column,
clustering_ordinal_position
from {{ information_schema.replace(information_schema_view='COLUMNS') }}
where ordinal_position is not null
),
info_schema_column_paths as (
select
concat(table_catalog, '.', table_schema, '.', table_name) as relation_id,
field_path as column_name,
data_type as column_type,
column_name as base_column_name,
description as column_comment
from {{ information_schema.replace(information_schema_view='COLUMN_FIELD_PATHS') }}
),
columns as (
select * except (base_column_name)
from info_schema_columns
join info_schema_column_paths using (relation_id, base_column_name)
),
column_stats as (
select
table_database,
table_schema,
table_name,
max(relation_id) as relation_id,
max(case when is_partitioning_column = 'YES' then 1 else 0 end) = 1 as is_partitioned,
max(case when is_partitioning_column = 'YES' then column_name else null end) as partition_column,
max(case when clustering_ordinal_position is not null then 1 else 0 end) = 1 as is_clustered,
array_to_string(
array_agg(
case
when clustering_ordinal_position is not null then column_name
else null
end ignore nulls
order by clustering_ordinal_position
), ', '
) as clustering_columns
from columns
group by 1,2,3
)
select
unsharded_tables.table_database,
unsharded_tables.table_schema,
case
when is_date_shard then concat(unsharded_tables.table_name, '*')
else unsharded_tables.table_name
end as table_name,
unsharded_tables.table_type,
-- coalesce name and type for External tables - these columns are not
-- present in the COLUMN_FIELD_PATHS resultset
coalesce(columns.column_name, '<unknown>') as column_name,
-- invent a row number to account for nested fields -- BQ does
-- not treat these nested properties as independent fields
row_number() over (
partition by relation_id
order by columns.column_index, columns.column_name
) as column_index,
coalesce(columns.column_type, '<unknown>') as column_type,
columns.column_comment,
'Shard count' as `stats__date_shards__label`,
table_shards.shard_count as `stats__date_shards__value`,
'The number of date shards in this table' as `stats__date_shards__description`,
is_date_shard as `stats__date_shards__include`,
'Shard (min)' as `stats__date_shard_min__label`,
table_shards.shard_min as `stats__date_shard_min__value`,
'The first date shard in this table' as `stats__date_shard_min__description`,
is_date_shard as `stats__date_shard_min__include`,
'Shard (max)' as `stats__date_shard_max__label`,
table_shards.shard_max as `stats__date_shard_max__value`,
'The last date shard in this table' as `stats__date_shard_max__description`,
is_date_shard as `stats__date_shard_max__include`,
'# Rows' as `stats__num_rows__label`,
row_count as `stats__num_rows__value`,
'Approximate count of rows in this table' as `stats__num_rows__description`,
(unsharded_tables.table_type = 'table') as `stats__num_rows__include`,
'Approximate Size' as `stats__num_bytes__label`,
size_bytes as `stats__num_bytes__value`,
'Approximate size of table as reported by BigQuery' as `stats__num_bytes__description`,
(unsharded_tables.table_type = 'table') as `stats__num_bytes__include`,
'Partitioned By' as `stats__partitioning_type__label`,
partition_column as `stats__partitioning_type__value`,
'The partitioning column for this table' as `stats__partitioning_type__description`,
is_partitioned as `stats__partitioning_type__include`,
'Clustered By' as `stats__clustering_fields__label`,
clustering_columns as `stats__clustering_fields__value`,
'The clustering columns for this table' as `stats__clustering_fields__description`,
is_clustered as `stats__clustering_fields__include`
-- join using relation_id (an actual relation, not a shard prefix) to make
-- sure that column metadata is picked up through the join. This will only
-- return the column information for the "max" table in a date-sharded table set
from unsharded_tables
left join columns using (relation_id)
left join column_stats using (relation_id)
{%- endset -%}
{%- endif -%}
{%- do log(query) -%}
{%- set results = run_query(query) -%}
{%- do log(schemas ~ ' - rows returned: ' ~ results | length, True) -%}
{{ return(results) }}
{%- endmacro %}
{% macro get_catalog(information_schema, schemas) -%}
{#
/*
This macro changes the catalog query to filter on dbt objects only and not all objects in a dataset.
The regex is very sensitive to the date sharding pattern of your tables so you may want to
change the regex of '^.+[0-9]{8}$' to '^.+[0-9]{6}$' if say your tables are shareded by YYYYMM
instead of YYYYMMDD.
*/
#}
{# Here is where we build our list of relations of interest #}
{%- set relations_in_project = [] -%}
{%- for node in graph.nodes.values() -%}
{%- if node.resource_type == 'model' -%}
{%- do relations_in_project.append(node.alias) -%}
{%- endif -%}
{%- endfor -%}
{%- for source in graph.sources.values() -%}
{%- do relations_in_project.append(source.name) -%}
{%- endfor -%}
{# Make the list unique #}
{%- set relations_in_project = set(relations_in_project) | list -%}
{%- if (schemas | length) == 0 -%}
{# Hopefully nothing cares about the columns we return when there are no rows #}
{%- set query = "select 1 as id limit 0" -%}
{%- else -%}
{%- set query -%}
with tables as (
select
project_id as table_database,
dataset_id as table_schema,
table_id as original_table_name,
concat(project_id, '.', dataset_id, '.', table_id) as relation_id,
row_count,
size_bytes as size_bytes,
case
when type = 1 then 'table'
when type = 2 then 'view'
else 'external'
end as table_type,
REGEXP_CONTAINS(table_id, '^.+[0-9]{8}$') and coalesce(type, 0) in (1, 2) as is_date_shard,
REGEXP_EXTRACT(table_id, '^(.+)[0-9]{8}$') as shard_base_name,
REGEXP_EXTRACT(table_id, '^.+([0-9]{8})$') as shard_name
from {{ information_schema.replace(information_schema_view='__TABLES__') }}
where (
{%- for schema in schemas -%}
upper(dataset_id) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{# Here is where we filter the relations so we dont simply fetch all of them #}
{%- if relations_in_project | length > 0 %}
and coalesce(REGEXP_EXTRACT(table_id, '^(.+)_{1}[0-9]{8}$'), table_id) in (
{%- for rel in relations_in_project -%}'{{ rel }}'{%- if not loop.last %}, {% endif -%}{%- endfor -%}
)
{% endif -%}
),
extracted as (
select *,
case
when is_date_shard then shard_base_name
else original_table_name
end as table_name
from tables
),
unsharded_tables as (
select
table_database,
table_schema,
table_name,
coalesce(table_type, 'external') as table_type,
is_date_shard,
struct(
min(shard_name) as shard_min,
max(shard_name) as shard_max,
count(*) as shard_count
) as table_shards,
sum(size_bytes) as size_bytes,
sum(row_count) as row_count,
max(relation_id) as relation_id
from extracted
group by 1,2,3,4,5
),
info_schema_columns as (
select
concat(table_catalog, '.', table_schema, '.', table_name) as relation_id,
table_catalog as table_database,
table_schema,
table_name,
-- use the "real" column name from the paths query below
column_name as base_column_name,
ordinal_position as column_index,
is_partitioning_column,
clustering_ordinal_position
from {{ information_schema.replace(information_schema_view='COLUMNS') }}
where ordinal_position is not null
),
info_schema_column_paths as (
select
concat(table_catalog, '.', table_schema, '.', table_name) as relation_id,
field_path as column_name,
data_type as column_type,
column_name as base_column_name,
description as column_comment
from {{ information_schema.replace(information_schema_view='COLUMN_FIELD_PATHS') }}
),
columns as (
select * except (base_column_name)
from info_schema_columns
join info_schema_column_paths using (relation_id, base_column_name)
),
column_stats as (
select
table_database,
table_schema,
table_name,
max(relation_id) as relation_id,
max(case when is_partitioning_column = 'YES' then 1 else 0 end) = 1 as is_partitioned,
max(case when is_partitioning_column = 'YES' then column_name else null end) as partition_column,
max(case when clustering_ordinal_position is not null then 1 else 0 end) = 1 as is_clustered,
array_to_string(
array_agg(
case
when clustering_ordinal_position is not null then column_name
else null
end ignore nulls
order by clustering_ordinal_position
), ', '
) as clustering_columns
from columns
group by 1,2,3
)
select
unsharded_tables.table_database,
unsharded_tables.table_schema,
case
when is_date_shard then concat(unsharded_tables.table_name, '*')
else unsharded_tables.table_name
end as table_name,
unsharded_tables.table_type,
-- coalesce name and type for External tables - these columns are not
-- present in the COLUMN_FIELD_PATHS resultset
coalesce(columns.column_name, '<unknown>') as column_name,
-- invent a row number to account for nested fields -- BQ does
-- not treat these nested properties as independent fields
row_number() over (
partition by relation_id
order by columns.column_index, columns.column_name
) as column_index,
coalesce(columns.column_type, '<unknown>') as column_type,
columns.column_comment,
'Shard count' as `stats__date_shards__label`,
table_shards.shard_count as `stats__date_shards__value`,
'The number of date shards in this table' as `stats__date_shards__description`,
is_date_shard as `stats__date_shards__include`,
'Shard (min)' as `stats__date_shard_min__label`,
table_shards.shard_min as `stats__date_shard_min__value`,
'The first date shard in this table' as `stats__date_shard_min__description`,
is_date_shard as `stats__date_shard_min__include`,
'Shard (max)' as `stats__date_shard_max__label`,
table_shards.shard_max as `stats__date_shard_max__value`,
'The last date shard in this table' as `stats__date_shard_max__description`,
is_date_shard as `stats__date_shard_max__include`,
'# Rows' as `stats__num_rows__label`,
row_count as `stats__num_rows__value`,
'Approximate count of rows in this table' as `stats__num_rows__description`,
(unsharded_tables.table_type = 'table') as `stats__num_rows__include`,
'Approximate Size' as `stats__num_bytes__label`,
size_bytes as `stats__num_bytes__value`,
'Approximate size of table as reported by BigQuery' as `stats__num_bytes__description`,
(unsharded_tables.table_type = 'table') as `stats__num_bytes__include`,
'Partitioned By' as `stats__partitioning_type__label`,
partition_column as `stats__partitioning_type__value`,
'The partitioning column for this table' as `stats__partitioning_type__description`,
is_partitioned as `stats__partitioning_type__include`,
'Clustered By' as `stats__clustering_fields__label`,
clustering_columns as `stats__clustering_fields__value`,
'The clustering columns for this table' as `stats__clustering_fields__description`,
is_clustered as `stats__clustering_fields__include`
-- join using relation_id (an actual relation, not a shard prefix) to make
-- sure that column metadata is picked up through the join. This will only
-- return the column information for the "max" table in a date-sharded table set
from unsharded_tables
left join columns using (relation_id)
left join column_stats using (relation_id)
{%- endset -%}
{%- endif -%}
{%- do log(query) -%}
{%- set results = run_query(query) -%}
{%- do log(schemas ~ ' - rows returned: ' ~ results | length, True) -%}
{{ return(results) }}
{%- endmacro %}
{% macro get_catalog(information_schema, schemas) -%}
{#
/*
This macro changes the catalog query to filter on dbt objects only and not all objects in a dataset.
The regex is very sensitive to the date sharding pattern of your tables so you may want to
change the regex of '^.+[0-9]{8}$' to '^.+[0-9]{6}$' if say your tables are shareded by YYYYMM
instead of YYYYMMDD.
If you want to filter only for models that have specific tags - add a 'docs_include_tags' var in
your dbt_project.yml file with the list of tags. Example:
vars:
docs_include_tags: ['marts']
*/
#}
{# Here is where we build our list of relations of interest #}
{%- set relations_in_project = [] -%}
{%- for node in graph.nodes.values() -%}
{%- if node.resource_type == 'model' -%}
{%- if not var('docs_include_tags', False) -%}
{%- do relations_in_project.append(node.alias) -%}
{%- else -%}
{%- for tag in node.config.tags -%}
{%- if tag in var('docs_include_tags') -%}
{%- do relations_in_project.append(node.alias) -%}
{%- endif -%}
{%- endfor -%}
{%- endif -%}
{%- endfor -%}
{%- for source in graph.sources.values() -%}
{%- do relations_in_project.append(source.name) -%}
{%- endfor -%}
{# Make the list unique #}
{%- set relations_in_project = set(relations_in_project) | list -%}
{%- if (schemas | length) == 0 -%}
{# Hopefully nothing cares about the columns we return when there are no rows #}
{%- set query = "select 1 as id limit 0" -%}
{%- else -%}
{%- set query -%}
with tables as (
select
project_id as table_database,
dataset_id as table_schema,
table_id as original_table_name,
concat(project_id, '.', dataset_id, '.', table_id) as relation_id,
row_count,
size_bytes as size_bytes,
case
when type = 1 then 'table'
when type = 2 then 'view'
else 'external'
end as table_type,
REGEXP_CONTAINS(table_id, '^.+[0-9]{8}$') and coalesce(type, 0) in (1, 2) as is_date_shard,
REGEXP_EXTRACT(table_id, '^(.+)[0-9]{8}$') as shard_base_name,
REGEXP_EXTRACT(table_id, '^.+([0-9]{8})$') as shard_name
from {{ information_schema.replace(information_schema_view='__TABLES__') }}
where (
{%- for schema in schemas -%}
upper(dataset_id) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{# Here is where we filter the relations so we dont simply fetch all of them #}
{%- if relations_in_project | length > 0 %}
and coalesce(REGEXP_EXTRACT(table_id, '^(.+)_{1}[0-9]{8}$'), table_id) in (
{%- for rel in relations_in_project -%}'{{ rel }}'{%- if not loop.last %}, {% endif -%}{%- endfor -%}
)
{% endif -%}
),
extracted as (
select *,
case
when is_date_shard then shard_base_name
else original_table_name
end as table_name
from tables
),
unsharded_tables as (
select
table_database,
table_schema,
table_name,
coalesce(table_type, 'external') as table_type,
is_date_shard,
struct(
min(shard_name) as shard_min,
max(shard_name) as shard_max,
count(*) as shard_count
) as table_shards,
sum(size_bytes) as size_bytes,
sum(row_count) as row_count,
max(relation_id) as relation_id
from extracted
group by 1,2,3,4,5
),
info_schema_columns as (
select
concat(table_catalog, '.', table_schema, '.', table_name) as relation_id,
table_catalog as table_database,
table_schema,
table_name,
-- use the "real" column name from the paths query below
column_name as base_column_name,
ordinal_position as column_index,
is_partitioning_column,
clustering_ordinal_position
from {{ information_schema.replace(information_schema_view='COLUMNS') }}
where ordinal_position is not null
),
info_schema_column_paths as (
select
concat(table_catalog, '.', table_schema, '.', table_name) as relation_id,
field_path as column_name,
data_type as column_type,
column_name as base_column_name,
description as column_comment
from {{ information_schema.replace(information_schema_view='COLUMN_FIELD_PATHS') }}
),
columns as (
select * except (base_column_name)
from info_schema_columns
join info_schema_column_paths using (relation_id, base_column_name)
),
column_stats as (
select
table_database,
table_schema,
table_name,
max(relation_id) as relation_id,
max(case when is_partitioning_column = 'YES' then 1 else 0 end) = 1 as is_partitioned,
max(case when is_partitioning_column = 'YES' then column_name else null end) as partition_column,
max(case when clustering_ordinal_position is not null then 1 else 0 end) = 1 as is_clustered,
array_to_string(
array_agg(
case
when clustering_ordinal_position is not null then column_name
else null
end ignore nulls
order by clustering_ordinal_position
), ', '
) as clustering_columns
from columns
group by 1,2,3
)
select
unsharded_tables.table_database,
unsharded_tables.table_schema,
case
when is_date_shard then concat(unsharded_tables.table_name, '*')
else unsharded_tables.table_name
end as table_name,
unsharded_tables.table_type,
-- coalesce name and type for External tables - these columns are not
-- present in the COLUMN_FIELD_PATHS resultset
coalesce(columns.column_name, '<unknown>') as column_name,
-- invent a row number to account for nested fields -- BQ does
-- not treat these nested properties as independent fields
row_number() over (
partition by relation_id
order by columns.column_index, columns.column_name
) as column_index,
coalesce(columns.column_type, '<unknown>') as column_type,
columns.column_comment,
'Shard count' as `stats__date_shards__label`,
table_shards.shard_count as `stats__date_shards__value`,
'The number of date shards in this table' as `stats__date_shards__description`,
is_date_shard as `stats__date_shards__include`,
'Shard (min)' as `stats__date_shard_min__label`,
table_shards.shard_min as `stats__date_shard_min__value`,
'The first date shard in this table' as `stats__date_shard_min__description`,
is_date_shard as `stats__date_shard_min__include`,
'Shard (max)' as `stats__date_shard_max__label`,
table_shards.shard_max as `stats__date_shard_max__value`,
'The last date shard in this table' as `stats__date_shard_max__description`,
is_date_shard as `stats__date_shard_max__include`,
'# Rows' as `stats__num_rows__label`,
row_count as `stats__num_rows__value`,
'Approximate count of rows in this table' as `stats__num_rows__description`,
(unsharded_tables.table_type = 'table') as `stats__num_rows__include`,
'Approximate Size' as `stats__num_bytes__label`,
size_bytes as `stats__num_bytes__value`,
'Approximate size of table as reported by BigQuery' as `stats__num_bytes__description`,
(unsharded_tables.table_type = 'table') as `stats__num_bytes__include`,
'Partitioned By' as `stats__partitioning_type__label`,
partition_column as `stats__partitioning_type__value`,
'The partitioning column for this table' as `stats__partitioning_type__description`,
is_partitioned as `stats__partitioning_type__include`,
'Clustered By' as `stats__clustering_fields__label`,
clustering_columns as `stats__clustering_fields__value`,
'The clustering columns for this table' as `stats__clustering_fields__description`,
is_clustered as `stats__clustering_fields__include`
-- join using relation_id (an actual relation, not a shard prefix) to make
-- sure that column metadata is picked up through the join. This will only
-- return the column information for the "max" table in a date-sharded table set
from unsharded_tables
left join columns using (relation_id)
left join column_stats using (relation_id)
{%- endset -%}
{%- endif -%}
{%- do log(query) -%}
{%- set results = run_query(query) -%}
{%- do log(schemas ~ ' - rows returned: ' ~ results | length, True) -%}
{{ return(results) }}
{%- endmacro %}
@kevinhoe
Copy link

kevinhoe commented Aug 3, 2022

Thank you @jeremyyeo! This macro override and step-by-step guide are incredibly helpful! 🙏

@MartyC-137
Copy link

Thanks @jeremyyeo! This was very helpful.

If anyone winds up here looking for a Snowflake version, here is one I modified from this gist and the dbt-snowflake catalog.sql query:

{% macro snowflake__get_catalog(information_schema, schemas) -%}

    {%- set relations_in_project = [] -%}

    {%- for node in graph.nodes.values() -%}
        {%- if node.resource_type == 'model' -%}
        {%- do relations_in_project.append(node.alias) -%}
        {%- endif -%}
    {%- endfor -%}
    {%- for source in graph.sources.values() -%}
        {%- do relations_in_project.append(source.name) -%}
    {%- endfor -%}
    
    {%- set relations_in_project = set(relations_in_project) | list -%}

    {%- if (schemas | length) == 0 -%}
        {%- set query  = "select 1 as id limit 0" -%}
    {%- else -%}

    {% set query %}
        with tables as (

            select

                table_catalog as "table_database",
                table_schema as "table_schema",
                table_name as "table_name",
                table_type as "table_type",
                comment as "table_comment",
                table_owner as "table_owner",
                'Clustering Key' as "stats:clustering_key:label",
                clustering_key as "stats:clustering_key:value",
                'The key used to cluster this table' as "stats:clustering_key:description",
                (clustering_key is not null) as "stats:clustering_key:include",
                'Row Count' as "stats:row_count:label",
                row_count as "stats:row_count:value",
                'An approximate count of rows in this table' as "stats:row_count:description",
                (row_count is not null) as "stats:row_count:include",
                'Approximate Size' as "stats:bytes:label",
                bytes as "stats:bytes:value",
                'Approximate size of the table as reported by Snowflake' as "stats:bytes:description",
                (bytes is not null) as "stats:bytes:include",
                'Last Modified' as "stats:last_modified:label",
                to_varchar(convert_timezone('UTC', last_altered), 'yyyy-mm-dd HH24:MI'||'UTC') as "stats:last_modified:value",
                'The timestamp for last update/change' as "stats:last_modified:description",
                (last_altered is not null and table_type='BASE TABLE') as "stats:last_modified:include"

            from {{ information_schema }}.tables

            where row_count > 0

            and (
                {%- for schema in schemas -%}
                upper("table_schema") = upper('{{ schema }}') {%- if not loop.last %} or {% endif -%}
                {%- endfor -%}
            )

            {%- if relations_in_project | length > 0 %}
            and coalesce(regexp_substr(table_name, '^(.+)_{1}[0-9]{8}$'), table_name) in (
                {%- for rel in relations_in_project -%} upper('{{ rel }}') {%- if not loop.last %}, {% endif -%}{%- endfor -%}
            )
            {% endif -%}

        ),

        columns as (

            select

                table_catalog as "table_database",
                table_schema as "table_schema",
                table_name as "table_name",
                column_name as "column_name",
                ordinal_position as "column_index",
                data_type as "column_type",
                comment as "column_comment"

            from {{ information_schema }}.columns

            where (
                {%- for schema in schemas -%}
                upper("table_schema") = upper('{{ schema }}') {%- if not loop.last %} or {% endif -%}
                {%- endfor -%}
            )

            {%- if relations_in_project | length > 0 %}
            and coalesce(regexp_substr(table_name, '^(.+)_{1}[0-9]{8}$'), table_name) in (
                {%- for rel in relations_in_project -%} upper('{{ rel }}') {%- if not loop.last %}, {% endif -%}{%- endfor -%}
            )
            {% endif -%}

        )

        select * from tables

        inner join columns using ("table_database", "table_schema", "table_name")

        order by "column_index"

    {%- endset -%}

    {%- endif -%}

    {%- do log(query) -%}
    {%- set results = run_query(query) -%}
    {%- do log(schemas ~ ' - rows returned: ' ~ results | length, True) -%}

    {{ return(results) }}

{%- endmacro %}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment