Skip to content

Instantly share code, notes, and snippets.

@anatolec
Last active December 8, 2023 21:10
Show Gist options
  • Save anatolec/63246f72956d8c1f50b5a2827d3b2d95 to your computer and use it in GitHub Desktop.
Save anatolec/63246f72956d8c1f50b5a2827d3b2d95 to your computer and use it in GitHub Desktop.
A DBT analysis to retrieve schema change from Airbyte internal data
{% set relations = dbt_utils.get_relations_by_prefix('<<YOUR_DATASET_NAME>>', '') %}
with base as (
{% for relation in relations %}
(select
"{{ relation.identifier }}" as table,
array_to_string(array(select col from unnest(bigfunctions.eu.json_keys(_airbyte_data)) as col order by col), ',') as columns,
min(_airbyte_extracted_at) as min_date,
max(_airbyte_extracted_at) as max_date,
from airbyte_internal.<<YOUR_DATASET_NAME>>_raw__stream_{{ relation.identifier }}
group by columns
order by max_date desc
limit 2)
{% if not loop.last %}union all{% endif %}
{% endfor %}
),
tables_with_changes as (
select *
from base
qualify count(*) over (partition by table) = 2
),
new_columns as (
select
table,
column
from tables_with_changes
join unnest(split(columns)) as column
qualify rank() over (partition by table order by max_date desc) = 1
except distinct
select
table,
column
from tables_with_changes
join unnest(split(columns)) as column
qualify rank() over (partition by table order by max_date asc) = 1
),
removed_columns as (
select
table,
column
from tables_with_changes
join unnest(split(columns)) as column
qualify rank() over (partition by table order by max_date asc) = 1
except distinct
select
table,
column
from tables_with_changes
join unnest(split(columns)) as column
qualify rank() over (partition by table order by max_date desc) = 1
),
all_changes as (
select
"New column" as change_type, table, column, min_date
from new_columns
join tables_with_changes using(table)
qualify rank() over (partition by table order by min_date desc) = 1
union all
select
"Removed column" as change_type, table, column, min_date
from removed_columns
join tables_with_changes using(table)
qualify rank() over (partition by table order by min_date desc) = 1
)
select
*
from all_changes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment