Skip to content

Instantly share code, notes, and snippets.

@davehowell
Created November 27, 2019 04:34
Show Gist options
  • Save davehowell/1d6564875f35e58d8da14c9adbcb92da to your computer and use it in GitHub Desktop.
Save davehowell/1d6564875f35e58d8da14c9adbcb92da to your computer and use it in GitHub Desktop.
Some dbt macros for fetching the max value of some column from a target table, useful in incremental models. The canonical subquery example is not performant, and sometimes you need a few different max values.
{% macro max_existing(max_field, target_ref=this) -%}
{#
Gets the max existing value for some field in the target, or some other ref or source.
Use in incremental models, inside `if is_incremental()` or `if execute` blocks,
otherwise the dbt model will not compile.
Useful where you have a primary key or other watermark field and want to construct SQL with
that value determined at compile time.
Why?
When using the max value multiple times in a query it will be better than inlining multiple
subqueries to fetch the same value, and in many cases, hardcoding a value in a where clause
generates a significantly better execution plan.
Params: max_field - string, the name of the field
target_ref - string, pass in a call to ref or source to ensure dag dependency
defaults to this (current context model)
Return: A literal string of the max value - will not have any quoting
Usage: To return dates use `max_existing_date`
To return timestamps use `max_existing_timestamp`
For integers:
SELECT pk_column [,col2 ...]
FROM source_table_ref
WHERE True
{%- if is_incremental() %}
AND some_int_column > {{ max_existing('some_int_column') }}
{%- endif %}
For strings - n.b. probably not useful to use strings as watermarks
SELECT pk_column [,col2 ...]
FROM source_table_ref
WHERE True
{%- if is_incremental() %}
AND some_string_column > '{{ max_existing("some_string_column") }}'
{%- endif %}
#}
{% call statement('get_max_existing', fetch_result=True)-%}
SELECT max({{ max_field }}) as max_existing
FROM {{ '"' ~ this.schema ~ '"' ~ '.' ~ '"' ~ this.name~ '"' if target_ref in [this] else target_ref }}
{%- endcall %}
{% set max_existing_field = load_result('get_max_existing').table.columns['max_existing'].values()[0] %}
{{ return(max_existing_field) }}
{%- endmacro %}
{% macro _max_existing_multirow(max_field, group_by_field) -%}
{#
Internal usage. Gets the max existing values for multiple groups in a target table.
Use in incremental model,
inside `if is_incremental()` or `if execute` otherwise
the dbt model will not compile.
Params:
max_field - column or valid SQL expression to be maxed
group_by_field - column or valid SQL expression to use in group by clause
Return: agate.Table.rows https://agate.readthedocs.io/en/1.6.1/api/table.html
Usage:
See the public version of this macro.
#}
{% call statement('get_max_existing_multirow', fetch_result=True)-%}
SELECT
{{ group_by_field }} as group_by_field,
max({{ max_field }}) AS max_existing
FROM {{ this.schema }}.{{ this.name }}
GROUP BY {{ group_by_field }}
{%- endcall %}
{% set result = load_result('get_max_existing_multirow') %}
{{ return(result.table.rows) }}
{%- endmacro %}
{% macro _two_column_matrix_to_list_dicts(agate_rows, col1_name, col2_name) %}
{#
Converts an agate.Table.rows to a list of dicts
Params: an agate.Table.rows and names of the two columns
Return: list of dicts with keys col1_name and col2_name
#}
{%- set list_dicts = [] -%}
{%- for row in agate_rows -%}
{{- list_dicts.append( {col1_name: row[0], col2_name : row[1] } ) -}}
{%- endfor -%}
{% for dict_val in list_dicts %}
{{ log(dict_val) }}
{%- endfor -%}
{{ return(list_dicts)}}
{% endmacro %}
{% macro max_existing_multirow(max_field, group_by_field) %}
{#
Gets the max existing values for multiple groups in a target table.
Use in incremental models, inside `if is_incremental()` or `if execute`
otherwise the dbt model will not compile.
Params:
max_field - column or valid SQL expression to be maxed
group_by_field - column or valid SQL expression to use in group by clause
Note: expressions should not have column aliases
Return: list of dicts where dict attributes have the keys 'type' and 'max_val'
Usage:
Iterate over the rows e.g.
{%- for row in max_existing_multirow('pk_column', 'type_column') %}
{{ row.pk_column }}
{%- endfor %}
Useful in an incremental model that selects from a `UNION ALL` across
many source models where the unique_key watermark field might not be unique
across those models. A surrogate key, hash or concatenated field in the target is not
adequate to check watermarks across multiple sources.
If you maintain some lineage indicator then it is possible to fetch all the max values
e.g.
Note for this example the fields 'type' and 'pk' in the target model store which parent
table the row comes from and the respective watermark of that table,
in this case a primary key. The watermark could also be some timestamp like "updated_at"
{%- set types = ['ada', 'grace', 'rosie'] %}
{%- if is_incremental() %}
{%- set max_values_list = mathspace.max_existing_multirow('pk', 'type') %}
{%- endif -%}
{%- for t in types %}
SELECT pk, col2, col3, coln FROM {{ ref(t) }}
WHERE True
{%- if is_incremental() %}
AND pk > {{ lookup_max_value(max_values, type) }}
{%- endif -%}
{%- if not loop.last %}
UNION ALL
{%- endif %}
{%- endfor %}
#}
{% set agate_rows = _max_existing_multirow(max_field, group_by_field) %}
{% set list_dicts = _two_column_matrix_to_list_dicts(agate_rows, 'type', 'max_val') %}
{{ return(list_dicts) }}
{% endmacro %}
{% macro lookup_max_value(max_values_list, lookup_value, lookup_key='type', return_key='max_val') %}
{#
Use with the result of the max_existing_multirow macro.
Does a lookup into that list of dicts for a given lookup_key.
Params: max_values - the output of the max_existing_multirow macro, a list of dicts
must have a 'type' and 'max_val' key
Return: literal string of the max value if found or None
None will print blank and probably cause an error in the SQL of your model.
#}
{{ return (max_values_list | selectattr(lookup_key, 'eq', lookup_value) | map(attribute=return_key) | list | first) }}
{% endmacro %}
{% macro _cast_as_type(stringvalue, as_type) %}
{{ return("CAST('" ~ stringvalue ~ "' AS " ~ as_type ~ ")") }}
{% endmacro %}
{% macro max_existing_timestamp(timestampfield, target_ref=this) -%}
{#
Params:
timestampfield: date or timestamp
Returns a SQL expression casting the max value string to a timestamp.
e.g. "CAST('2019-01-01 23:00.000' AS TIMESTAMP)"
#}
{{ return(_cast_as_type(max_existing(timestampfield, target_ref), 'TIMESTAMP')) }}
{%- endmacro %}
{% macro max_existing_date(timestampfield, target_ref=this) -%}
{#
Params:
timestampfield: date or timestamp
target_ref: string, optional, defaults to the current context model
pass in a call to ref or source to target a different model
Returns a SQL expression casting the max value string to a timestamp.
e.g. "CAST('2019-01-01' AS DATE)"
Usage:
current model - latest value of a field called "updated_at":
{{ max_existing_date('updated_at') }}
some other model ref - latest value of a field called "deactivated_at":
{{ max_existing_date('deactivated_at', ref('other_model')) }}
some other model source - latest value of a field called "deactivated_at":
{{ max_existing_date('deactivated_at', source('source_name', 'table_or_view_name')) }}
#}
{{ return(_cast_as_type(max_existing(timestampfield, target_ref), 'DATE')) }}
{%- endmacro %}
{% macro _max_existing_prior_date(datefield, target_ref=this) -%}
{% call statement('max_existing_prior_date', fetch_result=True)-%}
SELECT {{ dbt_utils.dateadd('day', -1, "max(" ~ datefield ~ ")" ) }} as max_existing_prior_date
FROM {{ '"' ~ this.schema ~ '"' ~ '.' ~ '"' ~ this.name~ '"' if target_ref in [this] else target_ref }}
{%- endcall %}
{% set max_existing_prior = load_result('max_existing_prior_date').table.columns['max_existing_prior_date'].values()[0] %}
{{ return(max_existing_prior) }}
{%- endmacro %}
{% macro max_existing_prior_date(datefield, target_ref=this) -%}
{#
A day prior to the max of some date field.
Params, Return and Usage as per the max_existing_date() macro
#}
{{ return(_cast_as_type(_max_existing_prior_date(datefield, target_ref), 'DATE')) }}
{%- endmacro %}
{% macro max_existing_prior_timestamp(datefield, target_ref=this) -%}
{#
A day prior to the max of some timestamp field
Params, Return and Usage as per the max_existing_date() macro
#}
{{ return(_cast_as_type(_max_existing_prior_date(datefield, target_ref), 'TIMESTAMP')) }}
{%- endmacro %}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment