Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Created March 21, 2023 01:02
Show Gist options
  • Save jeremyyeo/f07dbe9a7687ffc4976e1488a8e35547 to your computer and use it in GitHub Desktop.
Save jeremyyeo/f07dbe9a7687ffc4976e1488a8e35547 to your computer and use it in GitHub Desktop.
Unloading new rows to a Snowflake stage with hooks #dbt

Unloading new rows to a Snowflake stage with hooks

  1. First we want to create the necessary stages and file formats:
create or replace file format development_jyeo.dbt_jyeo.my_csv_format type = 'CSV';
create schema if not exists development_jyeo.dbt_jyeo_stages;
create or replace stage development_jyeo.dbt_jyeo_stages.my_stage file_format = development_jyeo.dbt_jyeo.my_csv_format;
  1. Check that our stage exists and is empty:
jyeo_integration#TRANSFORMING@DEVELOPMENT_JYEO.DBT_JYEO>list @development_jyeo.dbt_jyeo_stages.my_stage;
+------+------+-----+---------------+                                           
| name | size | md5 | last_modified |
|------+------+-----+---------------|
+------+------+-----+---------------+
0 Row(s) produced. Time Elapsed: 2.438s
  1. dbt project setup:
# dbt_project.yml
name: my_dbt_project
profile: snowflake
config-version: 2
version: 1.0

models:
  my_dbt_project:
    +materialized: table
    
# models/sources.yml
version: 2
sources:
  - name: my_sources
    schema: dbt_jyeo_stages
    tables:
      - name: my_stage

Note that the default schema (target.schema) for my project is dbt_jyeo - which is where our models will be written to but our stage is in a different schema - dbt_jyeo_stages.

-- models/foo.sql
{{
    config(
        materialized = 'incremental',
        post_hook = "{{ unload_this() }}"
    )
}}

select 1 as id, '{{ run_started_at.strftime("%Y-%m-%d %H:%M:%S") }}' as updated_at

And our unload_this() macro:

-- macros/unload_this.sql
{% macro unload_this() %}

    {% set run_ts = run_started_at.strftime("%Y-%m-%d %H:%M:%S") %}
    {# Replace the special characters in our run_ts so that it looks better when used as csv file names #}
    {% set run_ts_tidy = run_ts.replace(" ", "_").replace(":", "").replace("-", "_") %}

    {% set new_rows_query %}
        select count(*) as num_rows from {{ this }}
        {% if is_incremental() -%}
        where updated_at = '{{ run_ts }}'
        {%- endif %}
    {% endset %}

    {% set file_count_query %}
        select count(*) as num_files from @{{ source('my_sources', 'my_stage') }};
    {% endset %}

    {% set unload_query %}

          copy into @{{ source('my_sources', 'my_stage') }}/{{ this.name }}_{{ run_ts_tidy }}.csv.gz
          from (
              select * from {{ this }}
              {% if is_incremental() -%}
              where updated_at = '{{ run_ts }}'
              {%- endif %}
          )
          file_format = (format_name = development_jyeo.dbt_jyeo.my_csv_format)
          single = true;
    {% endset %}

    {% if execute %}
        {% set new_rows_count = run_query(new_rows_query) %}
        {% set file_count_before = run_query(file_count_query).columns[0].values()[0] %}
        {% do log('>>>>> New rows added to [' ~ this ~ ']: ' ~ new_rows_count.columns[0].values()[0], True) %}
        {% do log('>>>>> Running copy command: ' ~ unload_query, True) %}
        {% do run_query(unload_query) %}
        {% set file_count_after = run_query(file_count_query).columns[0].values()[0] %}
        {% do log('>>>>> Row count in stage: Before: ' ~ file_count_before ~ ' / After: ' ~ file_count_after, True) %}
    {% endif %}

{% endmacro %}
  1. Let's do our first run:
$ dbt run --full-refresh
00:59:29  Running with dbt=1.3.3
00:59:30  Found 1 model, 0 tests, 0 snapshots, 0 analyses, 306 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
00:59:30  
00:59:35  Concurrency: 1 threads (target='dev')
00:59:35  
00:59:35  1 of 1 START sql incremental model dbt_jyeo.foo ................................ [RUN]
00:59:41  >>>>> New rows added to [development_jyeo.dbt_jyeo.foo]: 1
00:59:41  >>>>> Running copy command: 

          copy into @development_jyeo.dbt_jyeo_stages.my_stage/foo_2023_03_21_005928.csv.gz
          from (
              select * from development_jyeo.dbt_jyeo.foo
              
          )
          file_format = (format_name = development_jyeo.dbt_jyeo.my_csv_format)
          single = true;
    
00:59:42  >>>>> Row count in stage: Before: 0 / After: 1
00:59:43  1 of 1 OK created sql incremental model dbt_jyeo.foo ........................... [SUCCESS 1 in 7.76s]
00:59:43  
00:59:43  Finished running 1 incremental model in 0 hours 0 minutes and 13.27 seconds (13.27s).
00:59:43  
00:59:43  Completed successfully
00:59:43  
00:59:43  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
  1. Let's change our foo model up and rerun:
-- models/foo.sql
{{
    config(
        materialized = 'incremental',
        post_hook = "{{ unload_this() }}"
    )
}}

select 2 as id, '{{ run_started_at.strftime("%Y-%m-%d %H:%M:%S") }}' as updated_at
union
select 3 as id, '{{ run_started_at.strftime("%Y-%m-%d %H:%M:%S") }}' as updated_at
$ dbt run
01:00:16  Running with dbt=1.3.3
01:00:17  Found 1 model, 0 tests, 0 snapshots, 0 analyses, 306 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
01:00:17  
01:00:22  Concurrency: 1 threads (target='dev')
01:00:22  
01:00:22  1 of 1 START sql incremental model dbt_jyeo.foo ................................ [RUN]
01:00:31  >>>>> New rows added to [development_jyeo.dbt_jyeo.foo]: 2
01:00:31  >>>>> Running copy command: 

          copy into @development_jyeo.dbt_jyeo_stages.my_stage/foo_2023_03_21_010015.csv.gz
          from (
              select * from development_jyeo.dbt_jyeo.foo
              where updated_at = '2023-03-21 01:00:15'
          )
          file_format = (format_name = development_jyeo.dbt_jyeo.my_csv_format)
          single = true;
    
01:00:33  >>>>> Row count in stage: Before: 1 / After: 3
01:00:33  1 of 1 OK created sql incremental model dbt_jyeo.foo ........................... [SUCCESS 2 in 11.07s]
01:00:33  
01:00:33  Finished running 1 incremental model in 0 hours 0 minutes and 16.56 seconds (16.56s).
01:00:33  
01:00:33  Completed successfully
01:00:33  
01:00:33  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
  1. Finally check our stage again to see how many files:
jyeo_integration#TRANSFORMING@DEVELOPMENT_JYEO.DBT_JYEO>list @development_jyeo.dbt_jyeo_stages.my_stage;
+---------------------------------------+------+----------------------------------+-------------------------------+
| name                                  | size | md5                              | last_modified                 |
|---------------------------------------+------+----------------------------------+-------------------------------|
| my_stage/foo_2023_03_21_005928.csv.gz |   48 | 2e381b11110495645e7c04ab2213e0f5 | Tue, 21 Mar 2023 00:59:33 GMT |
| my_stage/foo_2023_03_21_010015.csv.gz |   48 | eb079d7e22df736c2b458b9447e58087 | Tue, 21 Mar 2023 01:00:24 GMT |
+---------------------------------------+------+----------------------------------+-------------------------------+
2 Row(s) produced. Time Elapsed: 1.806s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment