Skip to content

Instantly share code, notes, and snippets.

@sbailliez
Created June 1, 2024 19:11
Show Gist options
  • Save sbailliez/65c80716790f32444f745f33c4aeeaf5 to your computer and use it in GitHub Desktop.
Save sbailliez/65c80716790f32444f745f33c4aeeaf5 to your computer and use it in GitHub Desktop.
dbt macro (redshift only) to compress a specified set of columns
{#
Compresses the columns of a table to the specified encodings. Unlike the compress_table macro in dbt-redshift,
you have to manually specify the encoding of the columns you want to compress, it does not run and use
the result of `ANALYZE compression <table>` which would is extremely slow to run every time
on a large table, especially when there is nothing to do and the table is already compressed.
If the specified column does not exist or if the specified encoding is identical to the current encoding,
the column is skipped.
ALTER statements are generated for each column where encoding needs to be changed.
Usage:
{{
config(
.....
post_hook= [ "{{ compress_table(this.schema, this.table, { 'mycolumn1': 'zstd', 'mycolumn2: 'az64' }) }}" ]
)
}}
#}
{% macro build_optimized_definition(definition, encodings) -%}
{% set optimized = { } %}
{% set _ = optimized.update(definition) %}
{% set _ = optimized.update({'columns': {} }) %}
{% for name, column in definition['columns'].items() %}
{% if not name in encodings %}
{{ log("Skipping column " ~ definition['schema'] ~ "." ~ definition['name'] ~ "." ~ name ~ " as it is not in the encodings list") }}
{% continue %}
{% endif %}
{% set provided_encoding = encodings[name] %}
{% if provided_encoding != column['encoding'] %}
{{ log("Changing compression of column " ~ definition['schema'] ~ "." ~ definition['name'] ~ "." ~ name ~ ": " ~ column['encoding'] ~ " -> " ~ provided_encoding) }}
{% set _ = optimized['columns'].update({name: definition['columns'][name]}) %}
{% set _ = optimized['columns'][name].update({"encoding": provided_encoding}) %}
{% else %}
{{ log("Ignoring compression of column " ~ definition['schema'] ~ "." ~ definition['name'] ~ "." ~ name ~ ". Already compressed with " ~ column['encoding']) }}
{% endif %}
{% endfor %}
{{ return(optimized) }}
{%- endmacro %}
{%- macro build_alter_sql(def) -%}
{% for column in def['columns'].values() %}
ALTER TABLE "{{ def['schema'] }}"."{{ def['name'] }}" ALTER COLUMN "{{ column['name'] }}" ENCODE {{ column['encoding'] }};
{% endfor %}
{%- endmacro %}
{%- macro compress_table(schema, table, encodings, skip_if_incremental=False) -%}
{% if not execute %}
{{ return(none) }}
{% endif %}
{% if skip_if_incremental and is_incremental() %}
{{ log("Skipping compression of " ~ schema ~ "." ~ table ~ ". skip_if_incremental is enabled") }}
{{ return('') }}
{% endif %}
{% set definition = redshift.fetch_table_definition(schema, table) %}
{% if definition is none %}
{{ log("Skipping compression of " ~ schema ~ "." ~ table ~ ". Not a table.") }}
{{ return(none) }}
{% endif %}
{% set optimized = build_optimized_definition(definition, encodings) %}
{# Build the DDL #}
{{ build_alter_sql(optimized) }}
{%- endmacro %}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment