Skip to content

Instantly share code, notes, and snippets.

@jtalmi
Last active October 2, 2020 16:57
Show Gist options
  • Save jtalmi/972c1a46692ab77376cfcf263027be7a to your computer and use it in GitHub Desktop.
Save jtalmi/972c1a46692ab77376cfcf263027be7a to your computer and use it in GitHub Desktop.
An insert_by_period macro for snowflake, with support for short sample windows when target = dev
{% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}
{% call statement('period_boundaries', fetch_result=True) -%}
with data as (
select
coalesce(max({{timestamp_field}}), {{start_date}})::timestamp as start_timestamp,
coalesce(
{{dbt_utils.dateadd('millisecond',
-1,
"nullif('" ~ stop_date ~ "','')::timestamp")
}},
{{dbt_utils.dateadd('millisecond',
-1,
dbt_utils.date_trunc(period, dbt_utils.current_timestamp())
)
}}
)::timestamp as stop_timestamp
from {{target_schema}}.{{target_table}}
)
select
start_timestamp,
stop_timestamp,
{{dbt_utils.datediff('start_timestamp',
'stop_timestamp',
period)}} + 1 as num_periods
from data
{%- endcall %}
{%- endmacro %}
{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}
{%- set period_filter -%}
({{timestamp_field}} > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
{{timestamp_field}} <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and
{{timestamp_field}} < '{{stop_timestamp}}'::timestamp)
{%- endset -%}
{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}
select
{{target_cols_csv}}
from (
{{filtered_sql}}
)
{%- endmacro %}
{% materialization snowflake_insert_by_period, adapter='snowflake' -%}
{%- set timestamp_field = config.require('timestamp_field') -%}
{%- set dev_datepart = config.get('dev_datepart') -%}
{%- set dev_lookback_periods = config.get('dev_lookback_periods') -%}
{% if target.name == 'dev' and dev_datepart and dev_lookback_periods %}
{%- set start_date = dbt_utils.dateadd(datepart=dev_datepart, interval="-" ~ dev_lookback_periods, from_date_or_timestamp=dbt_utils.current_timestamp()) -%}
{% elif target.name == 'dev' %}
{%- set start_date = dbt_utils.dateadd(datepart='hour', interval="-1", from_date_or_timestamp=dbt_utils.current_timestamp()) -%}
{% else %}
{%- set start_date = "'" ~ config.require('start_date') ~ "'" -%}
{% endif %}
{%- set stop_date = config.get('stop_date') or '' -%}}
{%- set period = config.get('period') or 'day' -%}
{%- if sql.find('__PERIOD_FILTER__') == -1 -%}
{%- set error_message -%}
Model '{{ model.unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql
{%- endset -%}
{{ exceptions.raise_compiler_error(error_message) }}
{%- endif -%}
{%- set identifier = model['name'] -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, type='table') -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
{% if target.name == 'dev' and dev_datepart and dev_lookback_periods %}
{%- set full_refresh_mode = True -%}
{% else %}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{% endif %}
{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}
{%- set should_truncate = (non_destructive_mode and full_refresh_mode and exists_as_table) -%}
{%- set should_drop = (not should_truncate and (full_refresh_mode or exists_not_as_table)) -%}
{%- set force_create = (flags.FULL_REFRESH and not flags.NON_DESTRUCTIVE) -%}
-- setup
{% if old_relation is none -%}
-- noop
{%- elif should_truncate -%}
{{adapter.truncate_relation(old_relation)}}
{%- elif should_drop -%}
{{adapter.drop_relation(old_relation)}}
{%- set old_relation = none -%}
{%- endif %}
{{run_hooks(pre_hooks, inside_transaction=False)}}
-- `begin` happens here, so `commit` after it to finish the transaction
{{run_hooks(pre_hooks, inside_transaction=True)}}
{% call statement() -%}
begin; -- make extra sure we've closed out the transaction
commit;
{%- endcall %}
-- build model
{% if force_create or old_relation is none -%}
{# Create an empty target table -#}
{% call statement('main') -%}
{%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%}
{{create_table_as(False, target_relation, empty_sql)}}
{%- endcall %}
{%- endif %}
{% set _ = snaptravel.get_period_boundaries(schema,
identifier,
timestamp_field,
start_date,
stop_date,
period) %}
{%- set start_timestamp = load_result('period_boundaries')['data'][0][0] | string -%}
{%- set stop_timestamp = load_result('period_boundaries')['data'][0][1] | string -%}
{%- set num_periods = load_result('period_boundaries')['data'][0][2] | int -%}
{%- set msg = "Start timestamp: " ~ start_timestamp ~ "\nStop timestamp: " ~ stop_timestamp ~ "\nNo. of periods: " ~ num_periods -%}
{{ dbt_utils.log_info(msg) }}
{% set target_columns = adapter.get_columns_in_relation(target_relation) %}
{%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
{%- set loop_vars = {'sum_rows_inserted': 0} -%}
-- commit each period as a separate transaction
{% for i in range(num_periods) -%}
{%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) -%}
{{ dbt_utils.log_info(msg) }}
{%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period' ~ i ~ '_tmp' -%}
{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema, type='table') -%}
{% call statement() -%}
{% set tmp_table_sql = snaptravel.get_period_sql(target_cols_csv,
sql,
timestamp_field,
period,
start_timestamp,
stop_timestamp,
i) %}
{{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}}
{%- endcall %}
{{adapter.expand_target_column_types(from_relation=tmp_relation,
to_relation=target_relation)}}
{%- set name = 'main-' ~ i -%}
{% call statement(name, fetch_result=True) -%}
insert into {{target_relation}} ({{target_cols_csv}})
(
select
{{target_cols_csv}}
from {{tmp_relation.include(schema=False)}}
);
{%- endcall %}
{%- set rows_inserted = (load_result('main-' ~ i)['status'].split(" "))[1] | int -%}
{%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
{%- if loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} {% endif -%}
{%- set msg = "Ran for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) ~ "; " ~ rows_inserted ~ " records inserted" -%}
{{ dbt_utils.log_info(msg) }}
{%- endfor %}
{% call statement() -%}
begin;
{%- endcall %}
{{run_hooks(post_hooks, inside_transaction=True)}}
{% call statement() -%}
commit;
{%- endcall %}
{{run_hooks(post_hooks, inside_transaction=False)}}
{%- set status_string = "INSERT " ~ loop_vars['sum_rows_inserted'] -%}
{% call noop_statement(name='main', status=status_string) -%}
-- no-op
{%- endcall %}
-- Return the relations created in this materialization
{{ return({'relations': [target_relation]}) }}
{%- endmaterialization %}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment