Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Last active February 24, 2022 11:07
Show Gist options
  • Save jeremyyeo/67e35e37880e3e7c8501672e183c4d5b to your computer and use it in GitHub Desktop.
Save jeremyyeo/67e35e37880e3e7c8501672e183c4d5b to your computer and use it in GitHub Desktop.
Customising the dbt-event-logging package #dbt

Customising the dbt-event-logging package

As of dbt-event-logging 0.6.0, the only customisation that can be done to it is to redefine the schema where the dbt_audit_log table is put into (see readme). If you want to further customise the columns that get's recorded in the dbt_audit_log table itself, for example adding dbt_cloud_run_id, you will have to get knee deep into the source macros.

To make thing's easier, we can just copy most of the code in the source audit.sql macro, put it in your macros folder (i.e. macros/audit.sql) and make some tweaks to it - this also means that you do not need to install the dbt-event-logging package via specifying it in your packages.yml file since the macro is part of our project.

The example audit.sql file in this gist shows how to add 2 additional columns that contain some special environment values which are available during a dbt Cloud run job.

Then in our dbt_project.yml file, we can call those macros like so:

...

on-run-start:
  - "{{ create_audit_schema() }}"
  - "{{ create_audit_log_table() }}"
  - "{{ log_run_start_event() }}"

on-run-end:
  - "{{ log_run_end_event() }}"

models:
  my_project_name:
    +pre-hook: "{{ log_model_start_event() }}"
    +post-hook: "{{ log_model_end_event() }}"

...
{% macro get_audit_schema() %}
{{ return(target.schema ~ '_meta') }}
{% endmacro %}
{% macro get_audit_relation() %}
{%- set audit_schema = get_audit_schema() -%}
{%- set audit_table =
api.Relation.create(
database = target.database,
schema = audit_schema,
identifier = 'dbt_audit_log',
type = 'table'
) -%}
{{ return(audit_table) }}
{% endmacro %}
{% macro create_audit_schema() %}
{%- set schema_name = get_audit_schema() -%}
{%- set schema_exists = adapter.check_schema_exists(database=target.database, schema=schema_name) -%}
{% if schema_exists == 0 %}
{% do create_schema(api.Relation.create(
database = target.database,
schema = schema_name)
) %}
{% endif %}
{% endmacro %}
{% macro create_audit_log_table() -%}
{# 2 new columns are added to the required_columns array below #}
{% set required_columns = [
["dbt_cloud_run_id", dbt_utils.type_string()],
["dbt_cloud_job_id", dbt_utils.type_string()],
["event_name", dbt_utils.type_string()],
["event_timestamp", dbt_utils.type_timestamp()],
["event_schema", dbt_utils.type_string()],
["event_model", dbt_utils.type_string()],
["event_user", dbt_utils.type_string()],
["event_target", dbt_utils.type_string()],
["event_is_full_refresh", "boolean"],
["invocation_id", dbt_utils.type_string()],
] -%}
{% set audit_table = get_audit_relation() -%}
{% set audit_table_exists = adapter.get_relation(audit_table.database, audit_table.schema, audit_table.name) -%}
{% if audit_table_exists -%}
{%- set columns_to_create = [] -%}
{# map to lower to cater for snowflake returning column names as upper case #}
{%- set existing_columns = adapter.get_columns_in_relation(audit_table)|map(attribute='column')|map('lower')|list -%}
{%- for required_column in required_columns -%}
{%- if required_column[0] not in existing_columns -%}
{%- do columns_to_create.append(required_column) -%}
{%- endif -%}
{%- endfor -%}
{%- for column in columns_to_create -%}
alter table {{ audit_table }}
add column {{ column[0] }} {{ column[1] }}
default null;
{% endfor -%}
{%- if columns_to_create|length > 0 %}
commit;
{% endif -%}
{%- else -%}
create table if not exists {{ audit_table }}
(
{% for column in required_columns %}
{{ column[0] }} {{ column[1] }}{% if not loop.last %},{% endif %}
{% endfor %}
)
{%- endif -%}
{%- endmacro %}
{% macro log_audit_event(event_name, schema, relation, user, target_name, is_full_refresh) %}
{#- 2 new values are inserted for the 2 new columns added -#}
insert into {{ get_audit_relation() }} (
dbt_cloud_run_id,
dbt_cloud_job_id,
event_name,
event_timestamp,
event_schema,
event_model,
event_user,
event_target,
event_is_full_refresh,
invocation_id
)
values (
'{{ env_var("DBT_CLOUD_RUN_ID", "manual-run") }}',
'{{ env_var("DBT_CLOUD_JOB_ID", "ad-hoc-job") }}',
'{{ event_name }}',
{{ dbt_utils.current_timestamp_in_utc() }},
{% if schema != None %}'{{ schema }}'{% else %}null::varchar(512){% endif %},
{% if relation != None %}'{{ relation }}'{% else %}null::varchar(512){% endif %},
{% if user != None %}'{{ user }}'{% else %}null::varchar(512){% endif %},
{% if target_name != None %}'{{ target_name }}'{% else %}null::varchar(512){% endif %},
{% if is_full_refresh %}TRUE{% else %}FALSE{% endif %},
'{{ invocation_id }}'
);
commit;
{% endmacro %}
{% macro log_run_start_event() %}
{{ log_audit_event('run started', user=target.user, target_name=target.name, is_full_refresh=flags.FULL_REFRESH) }}
{% endmacro %}
{% macro log_run_end_event() %}
{{ log_audit_event('run completed', user=target.user, target_name=target.name, is_full_refresh=flags.FULL_REFRESH) }}
{% endmacro %}
{% macro log_model_start_event() %}
{{ log_audit_event(
'model deployment started', schema=this.schema, relation=this.name, user=target.user, target_name=target.name, is_full_refresh=flags.FULL_REFRESH
) }}
{% endmacro %}
{% macro log_model_end_event() %}
{{ log_audit_event(
'model deployment completed', schema=this.schema, relation=this.name, user=target.user, target_name=target.name, is_full_refresh=flags.FULL_REFRESH
) }}
{% endmacro %}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment