Last active
December 8, 2023 21:10
-
-
Save anatolec/63246f72956d8c1f50b5a2827d3b2d95 to your computer and use it in GitHub Desktop.
A DBT analysis to retrieve schema change from Airbyte internal data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{% set relations = dbt_utils.get_relations_by_prefix('<<YOUR_DATASET_NAME>>', '') %} | |
with base as ( | |
{% for relation in relations %} | |
(select | |
"{{ relation.identifier }}" as table, | |
array_to_string(array(select col from unnest(bigfunctions.eu.json_keys(_airbyte_data)) as col order by col), ',') as columns, | |
min(_airbyte_extracted_at) as min_date, | |
max(_airbyte_extracted_at) as max_date, | |
from airbyte_internal.<<YOUR_DATASET_NAME>>_raw__stream_{{ relation.identifier }} | |
group by columns | |
order by max_date desc | |
limit 2) | |
{% if not loop.last %}union all{% endif %} | |
{% endfor %} | |
), | |
tables_with_changes as ( | |
select * | |
from base | |
qualify count(*) over (partition by table) = 2 | |
), | |
new_columns as ( | |
select | |
table, | |
column | |
from tables_with_changes | |
join unnest(split(columns)) as column | |
qualify rank() over (partition by table order by max_date desc) = 1 | |
except distinct | |
select | |
table, | |
column | |
from tables_with_changes | |
join unnest(split(columns)) as column | |
qualify rank() over (partition by table order by max_date asc) = 1 | |
), | |
removed_columns as ( | |
select | |
table, | |
column | |
from tables_with_changes | |
join unnest(split(columns)) as column | |
qualify rank() over (partition by table order by max_date asc) = 1 | |
except distinct | |
select | |
table, | |
column | |
from tables_with_changes | |
join unnest(split(columns)) as column | |
qualify rank() over (partition by table order by max_date desc) = 1 | |
), | |
all_changes as ( | |
select | |
"New column" as change_type, table, column, min_date | |
from new_columns | |
join tables_with_changes using(table) | |
qualify rank() over (partition by table order by min_date desc) = 1 | |
union all | |
select | |
"Removed column" as change_type, table, column, min_date | |
from removed_columns | |
join tables_with_changes using(table) | |
qualify rank() over (partition by table order by min_date desc) = 1 | |
) | |
select | |
* | |
from all_changes |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment