Last active
June 21, 2021 14:57
-
-
Save raven-rock/1c41e173125bcc804ec7e62e99702f30 to your computer and use it in GitHub Desktop.
Example of PIT-PSA (Point-In-Time Persistent Staging Area) system in PostgreSQL, implemented via Common Table Expressions (CTE), for efficient ingestion and change data capture (CDC). Suitable for as base for a scriptable star schema.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* NOTES | |
This file contains an efficient, script-ready algorithm to upsert data into a | |
persistent, point-in-time (PIT) staging table (PSA) in PostgreSQL. | |
It is assumed that the PSA will provide a virtually immutable datastore of all | |
ingested data in it's original form, e.g., as it appeared in a source CSV files | |
or source tables at certain points in time. The PSA in turn provides the basis | |
for dynamically deriving useful analytical datasets, such as star/snowflake | |
schemas, which can be blown away and rebuilt as needed to support ever changing | |
business logic requirements. (Caching via materialized views recommended for | |
performance sake.) | |
This SQL-based algorithm is portable to most other sufficiently advanced RDBMs | |
given that they support common table expressions (CTE), "upsert" operations and | |
a hashmap data type such as JSON. Even if those features are not directly | |
supported, they could be emulated by means of a complex series of temporary | |
tables, joins, and awkward comparisons, though the code would be significantly | |
more verbose and difficult to work with due to the lack of expressiveness. | |
There are a lot of oftten misunderstood and eclectic concepts at play here. So | |
before working with this example, it is highly recommended that you read and | |
grok the following material on what, why, and how of PSAs and PIT tables: | |
- Persistent staging area: https://www.leapfrogbi.com/why-you-need-a-persistent-staging-area-psa/ | |
- Temporal databases: https://en.wikipedia.org/wiki/Temporal_database | |
- Tuple-versioning: https://en.wikipedia.org/wiki/Tuple-versioning | |
- Kimball methodology slowly changing dimensions: | |
- https://www.kimballgroup.com/2013/02/design-tip-152-slowly-changing-dimension-types-0-4-5-6-7/ | |
- https://en.wikipedia.org/wiki/Slowly_changing_dimension | |
- Log Trigger design pattern (automatic secondary temporal "history" table via triggers): https://en.wikipedia.org/wiki/Log_trigger | |
The PSA table is assumed to follow the following conventions around the set of | |
and naming of columns. The "incoming" dataset, ie as from CSV or source system, | |
only requires three of them, namely: `nk`, `vtime`, `attrs`. | |
- `nk`: the value for the natural key of the business dataset. | |
- `vtime` column: valid time - a `timestamp(0)` (one second precision) | |
defining when the business attributes are deem "valid", or true, in the real | |
world. | |
- `xtime`: expiry time - a `timestamp(0)` when the business | |
attributes are deemed not-valid any longer. | |
- `attrs`: Business data attributes in JSON format. Use of JSON permits a | |
variable set of attributes per entity (natural key), effectively acting as a | |
hybrid document database. The rationale being that if the data model makes | |
non-breaking changes in the form of adding or removing attributes at the | |
entity grain, which happens all the time in the wild, the PSA will just keep | |
on humming. For the most part then, we allow downstream analysts to interpret | |
the meaning and implications of attributes for a given entity at any given | |
time. | |
- `attrs_digest`: a generated column that is just the MD5 hash digest of the | |
JSON attrs hash (sorted alphabetically by key names). Used to condense | |
business data into an efficiently indexable form. | |
- The actual primary key of the PSA table is {nk, vtime}. | |
Any lines in this file containing the special "prototype-only" string in allcaps | |
needs to be removed before productionalizing. | |
Parameters: | |
- `psa_example_table` :: replace this with the tablename of your choice. | |
*/ | |
drop table if exists psa_example_table; -- PROTOTYPE-ONLY | |
/* Idempotently create the target PSA table. */ | |
create table if not exists psa_example_table ( | |
nk text not null | |
, vtime timestamp(0) not null default now()::timestamp(0) | |
, xtime timestamp(0) not null check (xtime > vtime) | |
, attrs_digest text not null generated always as (md5(attrs)) stored | |
, attrs text not null | |
, primary key (nk, vtime) | |
); | |
create temporary table stg ( -- PROTOTYPE-ONLY | |
nk text not null -- PROTOTYPE-ONLY | |
, vtime timestamp(0) not null default now()::timestamp(0) -- PROTOTYPE-ONLY | |
, xtime timestamp(0) not null check (xtime > vtime) -- PROTOTYPE-ONLY | |
, attrs_digest text not null generated always as (md5(attrs)) stored -- PROTOTYPE-ONLY | |
, attrs text not null -- PROTOTYPE-ONLY | |
, primary key (nk, vtime) -- PROTOTYPE-ONLY | |
); -- PROTOTYPE-ONLY | |
/* PROTOTYPE-ONLY: psa_example_table data. In prod you will not need this as the table will already be created and contain previously persisted state. */ | |
INSERT INTO psa_example_table ("nk", "vtime", "xtime", "attrs") VALUES -- PROTOTYPE-ONLY | |
('1-c4ca4238a0b923820dcc509a6f75849b', '2021-05-10 17:44:11', '9999-12-31 23:59:59', 'aa89ddb9-b399-46b1-95bd-9ce80886df667413bbfd-8ef0-4e43-ae48-862f5ca6a67c'), -- PROTOTYPE-ONLY | |
('2-c81e728d9d4c2f636f067f89cc14862c', '2021-05-10 17:44:11', '9999-12-31 23:59:59', 'same, but different vtime'), -- PROTOTYPE-ONLY | |
('3-eccbc87e4b5ce2fe28308fd9f2a7baf3', '2021-05-10 17:44:11', '9999-12-31 23:59:59', '03058cb0-e224-4eaf-9ce3-b7c1970e08d5be95a79f-ffc8-4513-9950-b4fc116b23d8') -- PROTOTYPE-ONLY | |
; -- PROTOTYPE-ONLY | |
/* PROTOTYPE-ONLY: replace this with a PG 'COPY' command to read CSV from piped command. */ | |
INSERT INTO stg ("nk", "vtime", "xtime", "attrs") VALUES -- PROTOTYPE-ONLY | |
('2-c81e728d9d4c2f636f067f89cc14862c', '2021-06-11 00:00:00', '9999-12-31 23:59:59', 'same, but different vtime'), -- PROTOTYPE-ONLY | |
('3-eccbc87e4b5ce2fe28308fd9f2a7baf3', '2021-06-11 00:00:00', '9999-12-31 23:59:59', 'this is modified'), -- PROTOTYPE-ONLY | |
('4-a87ff679a2f3e71d9181a67b7542122c', '2021-06-11 00:00:00', '9999-12-31 23:59:59', 'this is new') -- PROTOTYPE-ONLY | |
; -- PROTOTYPE-ONLY | |
WITH | |
cat as ( | |
select nk, vtime, xtime, attrs_digest, attrs | |
from stg | |
union | |
select nk, vtime, xtime, attrs_digest, attrs | |
from psa_example_table | |
where exists (select 1 from stg where nk = psa_example_table.nk) | |
) | |
, | |
reconc as ( | |
/* | |
*/ | |
select | |
nk | |
, vtime | |
, xtime | |
, coalesce(lead(vtime,1) over (partition by nk order by vtime),'9999-12-31 23:59:59'::timestamp(0)) AS next_vtime | |
, attrs_digest | |
, coalesce(lag(attrs_digest,1) over (partition by nk order by vtime),'x') AS prev_attrs_digest | |
, coalesce(lead(attrs_digest,1) over (partition by nk order by vtime),'x') AS next_attrs_digest | |
, attrs | |
, case | |
when | |
( attrs_digest = /* prev_attrs_digest */ coalesce(lag(attrs_digest,1) over (partition by nk order by vtime),'x') | |
OR | |
attrs_digest = /* next_attrs_digest */ coalesce(lead(attrs_digest,1) over (partition by nk order by vtime),'x') | |
) then 'omit:same-attrs' | |
when xtime <> /* next_vtime */ coalesce(lead(vtime,1) over (partition by nk order by vtime),'9999-12-31 23:59:59'::timestamp(0)) then 'upsert:modified' | |
when xtime = /* next_vtime */ coalesce(lead(vtime,1) over (partition by nk order by vtime),'9999-12-31 23:59:59'::timestamp(0)) then 'upsert:new' | |
else 'huh?' | |
end | |
AS upsert_status | |
from cat | |
) | |
, | |
upserts as ( | |
select | |
nk | |
, vtime | |
, next_vtime as xtime | |
, attrs | |
, upsert_status | |
from reconc | |
) | |
, | |
do_upsert as ( | |
insert into psa_example_table (nk, vtime, xtime, attrs) | |
select nk, vtime, xtime, attrs from upserts where upsert_status like 'upsert:%' | |
on conflict (nk, vtime) do update set | |
xtime = excluded.xtime | |
, attrs = excluded.attrs | |
returning * | |
) | |
select * from do_upsert | |
; | |
select * from psa_example_table; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment