Skip to content

Instantly share code, notes, and snippets.

@kzzzr
Created November 19, 2021 07:33
Show Gist options
  • Save kzzzr/fa3d781238d7138e7dedd067cc4f772b to your computer and use it in GitHub Desktop.
Save kzzzr/fa3d781238d7138e7dedd067cc4f772b to your computer and use it in GitHub Desktop.
Prepare WHERE condition dynamically – dbt macro
select
_id as audit_id
, resource_id
, driver_id
, "user_id"
, "event"
, created_at
, resource_type
, json_parse("data") as "data"
from {{ source('wheely', 'audit') }}
{{ hevo_filter_rows(last_n_days_of_data=true, timestamp_column='created_at') }}
-- filter data for deleted rows; resize for dev, ci pipelines; apply incremental load filters
{% macro hevo_filter_rows(
deleted_rows_filter=true,
last_n_days_of_data=false,
timestamp_column='created_at'
) -%}
{#- prepare expression to filter deleted rows by flag __hevo__marked_deleted where it exists -#}
{%- set deleted_rows_expression = '__hevo__marked_deleted is not true' if deleted_rows_filter == true else '1 = 1' -%}
{#- cast epoch to timestamp if necessary -#}
{%- set timestamp_column = epoch_to_timestamp('__hevo__ingested_at')
if timestamp_column == '__hevo__ingested_at' else timestamp_column -%}
{#- prepare expression to filter rows to last 'development_days_of_data' (e.g. last 3 days) -#}
{% set get_dev_watermark_query = 'select dateadd(day, ' ~ -1 * var('development_days_of_data') ~ ', current_timestamp::date)' %}
{# {{ log("get_dev_watermark_query: " ~ get_dev_watermark_query, info=True) }} #}
{% if execute %}
{% set dev_watermark = "'" ~ run_query(get_dev_watermark_query).columns[0][0] ~ "'" %}
{% endif %}
{%- set dev_rows_expression = timestamp_column ~ ' >= ' ~ dev_watermark
if target.name in ['dev', 'ci'] and last_n_days_of_data == true else '1 = 1' -%}
{#- prepare final filter expression -#}
where 1 = 1
and {{ deleted_rows_expression }}
and {{ dev_rows_expression }}
{%- endmacro -%}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment