Skip to content

Instantly share code, notes, and snippets.

@oolongtea
Last active May 6, 2024 17:27
Show Gist options
  • Save oolongtea/694bd25b0d0a906a4821270606f1cdf7 to your computer and use it in GitHub Desktop.
Save oolongtea/694bd25b0d0a906a4821270606f1cdf7 to your computer and use it in GitHub Desktop.
merge into vimeo.video t
using (
select
t.record_content:op::text as op
-- for the 'd' case, the 'id' will only come through in the 'before' section
, coalesce(t.record_content:before.id::integer, t.record_content:after.id::integer) as id
, t.record_content:after.user_id::integer as user_id
, t.record_content:after.video_title::text as video_title
, to_timestamp_tz(t.record_content:after.created_at::integer, 3) as created_at
from vimeo.video_stream t
where t.record_content:op::text in ('u', 'c', 'd')
and coalesce(t.record_content:before.id::integer, t.record_content:after.id::integer) is not null
-- handle when a batch of records has multiple operations for one record (update, insert, delete)
-- in this case, take the most recent based on primary key
qualify row_number() over (partition by coalesce(t.record_content:before.id::integer, t.record_content:after.id::integer)
-- combine ts_ms and pos since they may be duplicated in the debezium logs
order by (t.record_content:ts_ms::integer*100000000000000 + t.record_content:source.pos::integer) desc nulls last
) = 1
) s on s.id = t.id
when matched and op = 'd'
then delete
-- here we assume that if a record is deleted then recreated immediately, we should handle that as an update operation
when matched and (op = 'u' or op = 'c')
then update set t.created_at = s.created_at
, t.user_id = s.user_id
, t.video_title = s.video_title
when not matched and op != 'd'
then insert (id, created_at, user_id, video_title)
values (t.id, t.created_at, t.user_id, t.video_title)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment