Skip to content

Instantly share code, notes, and snippets.

@adinsmoor
Created August 1, 2022 05:55
Show Gist options
  • Save adinsmoor/bc5a739ae647931c78dc707896ebc613 to your computer and use it in GitHub Desktop.
Save adinsmoor/bc5a739ae647931c78dc707896ebc613 to your computer and use it in GitHub Desktop.
Logging package customization example: add row_count
-- add a dispatch command to your dbt_project.yml file (where my_project is your project name)
dispatch:
- macro_namespace: logging
search_order: ['my_project', ' logging']
-- modify the macro with desired customizations
{% macro default__log_audit_event(event_name, schema, relation, user, target_name, is_full_refresh) %}
insert into {{ logging.get_audit_relation() }} (
event_name,
event_timestamp,
event_schema,
event_model,
event_user,
event_target,
event_is_full_refresh,
invocation_id,
row_count
)
values (
'{{ event_name }}',
{{ dbt_utils.current_timestamp_in_utc() }},
{% if schema != None %}'{{ schema }}'{% else %}null::varchar(512){% endif %},
{% if relation != None %}'{{ relation }}'{% else %}null::varchar(512){% endif %},
{% if user != None %}'{{ user }}'{% else %}null::varchar(512){% endif %},
{% if target_name != None %}'{{ target_name }}'{% else %}null::varchar(512){% endif %},
{% if is_full_refresh %}TRUE{% else %}FALSE{% endif %},
'{{ invocation_id }}',
{% if event_name in ('model deployment completed') %}
(select count(*) from {{ schema }}.{{ relation }})
{% else %} null {% endif %}
);
commit;
{% endmacro %}
{% macro default__create_audit_log_table() -%}
{% set required_columns = [
["event_name", dbt_utils.type_string()],
["event_timestamp", dbt_utils.type_timestamp()],
["event_schema", dbt_utils.type_string()],
["event_model", dbt_utils.type_string()],
["event_user", dbt_utils.type_string()],
["event_target", dbt_utils.type_string()],
["event_is_full_refresh", "boolean"],
["invocation_id", dbt_utils.type_string()],
["row_count", dbt_utils.type_numeric()],
] -%}
{% set audit_table = logging.get_audit_relation() -%}
{% set audit_table_exists = adapter.get_relation(audit_table.database, audit_table.schema, audit_table.name) -%}
{% if audit_table_exists -%}
{%- set columns_to_create = [] -%}
{# map to lower to cater for snowflake returning column names as upper case #}
{%- set existing_columns = adapter.get_columns_in_relation(audit_table)|map(attribute='column')|map('lower')|list -%}
{%- for required_column in required_columns -%}
{%- if required_column[0] not in existing_columns -%}
{%- do columns_to_create.append(required_column) -%}
{%- endif -%}
{%- endfor -%}
{%- for column in columns_to_create -%}
alter table {{ audit_table }}
add column {{ column[0] }} {{ column[1] }}
default null;
{% endfor -%}
{%- if columns_to_create|length > 0 %}
commit;
{% endif -%}
{%- else -%}
create table if not exists {{ audit_table }}
(
{% for column in required_columns %}
{{ column[0] }} {{ column[1] }}{% if not loop.last %},{% endif %}
{% endfor %}
)
{%- endif -%}
{%- endmacro %}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment