Skip to content

Instantly share code, notes, and snippets.

@drewbanin
Last active February 7, 2018 00:54
Show Gist options
  • Save drewbanin/80f6d0d3ed7e2518b568e04ad8da64b7 to your computer and use it in GitHub Desktop.
Save drewbanin/80f6d0d3ed7e2518b568e04ad8da64b7 to your computer and use it in GitHub Desktop.
-- USAGE:
{{
config(
materialized='insert',
dest_table='dbt_dbanin.table_to_insert_into',
date_field="timestamp",
start_date='2017-07-01'
)
}}
select ...
---------------------------------------------------------------------------
{% macro get_max_timestamp(dest_schema, dest_table, date_field, start_date) -%}
{%- call statement('max_ts', fetch_result=True) -%}
with data as (
select
coalesce(max("{{ date_field }}"), '{{ start_date }}')::timestamp as max_timestamp
from "{{ dest_schema }}"."{{ dest_table }}"
)
select
max_timestamp,
datediff('week', max_timestamp, getdate()) + 1 days_to_now
from data
{%- endcall -%}
{%- endmacro %}
{% macro get_insert_sql(dest_schema, dest_table, dest_cols_csv, sql, date_field, start_date, max_ts, offset = 0) %}
set wlm_query_slot_count to 3;
begin;
insert into "{{ dest_schema }}"."{{ dest_table }}" ({{ dest_cols_csv }})
(
select
{{ dest_cols_csv }}
from (
{{ sql }}
)
where (
"{{ date_field }}" > '{{ max_ts }}'::timestamp + interval '{{ offset }} week' and
"{{ date_field }}" <= '{{ max_ts }}'::timestamp + interval '{{ offset }} week' + interval '1 week' -- todo: make this a variable
)
order by "{{ date_field }}"
);
commit;
{% endmacro %}
{% macro run_for_day(dest_schema, dest_table, dest_cols_csv, sql, date_field, start_date, max_ts, offset) %}
{%- set name = 'main' if offset == 0 else 'main-' ~ offset -%}
{% if flags.FULL_REFRESH %}
{%- call statement(name) -%}
{{ get_insert_sql(dest_schema, dest_table, dest_cols_csv, sql, date_field, start_date, max_ts, offset) }}
{% endcall %}
{% else %}
{% call noop_statement(name, status="NO-OP", res=[]) -%}
-- Not running : only insert in sinter full refresh
{{ get_insert_sql(dest_schema, dest_table, dest_cols_csv, sql, date_field, start_date, max_ts, offset) }}
{%- endcall %}
{% endif %}
{% endmacro %}
{% materialization insert, default -%}
{%- set dest_identifier = config.require('dest_table') -%}
{%- set date_field = config.require('date_field') -%}
{%- set start_date = config.require('start_date') -%}
{%- set dest_schema, dest_table = (dest_identifier | replace('"', '')).split(".") -%}
{%- set dest_columns = adapter.get_columns_in_table(dest_schema, dest_table) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{%- set _ = get_max_timestamp(dest_schema, dest_table, date_field, start_date) -%}
{%- set max_ts = load_result('max_ts')['data'][0][0] | string -%}
{%- set num_days = load_result('max_ts')['data'][0][1] -%}
-- commit each day as a separate transaction
{%- for i in range(num_days) -%}
{{ log(" + Running for week " ~ i ~ " of " ~ (num_days - 1), info=True) }}
{%- set _ = run_for_day(dest_schema, dest_table, dest_cols_csv, sql, date_field, start_date, max_ts, i) -%}
{%- endfor -%}
{%- endmaterialization %}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment