Skip to content

Instantly share code, notes, and snippets.

@raven-rock
Last active June 21, 2021 14:57
Show Gist options
  • Save raven-rock/1c41e173125bcc804ec7e62e99702f30 to your computer and use it in GitHub Desktop.
Save raven-rock/1c41e173125bcc804ec7e62e99702f30 to your computer and use it in GitHub Desktop.
Example of PIT-PSA (Point-In-Time Persistent Staging Area) system in PostgreSQL, implemented via Common Table Expressions (CTE), for efficient ingestion and change data capture (CDC). Suitable for as base for a scriptable star schema.
/* NOTES
This file contains an efficient, script-ready algorithm to upsert data into a
persistent, point-in-time (PIT) staging table (PSA) in PostgreSQL.
It is assumed that the PSA will provide a virtually immutable datastore of all
ingested data in it's original form, e.g., as it appeared in a source CSV files
or source tables at certain points in time. The PSA in turn provides the basis
for dynamically deriving useful analytical datasets, such as star/snowflake
schemas, which can be blown away and rebuilt as needed to support ever changing
business logic requirements. (Caching via materialized views recommended for
performance sake.)
This SQL-based algorithm is portable to most other sufficiently advanced RDBMs
given that they support common table expressions (CTE), "upsert" operations and
a hashmap data type such as JSON. Even if those features are not directly
supported, they could be emulated by means of a complex series of temporary
tables, joins, and awkward comparisons, though the code would be significantly
more verbose and difficult to work with due to the lack of expressiveness.
There are a lot of oftten misunderstood and eclectic concepts at play here. So
before working with this example, it is highly recommended that you read and
grok the following material on what, why, and how of PSAs and PIT tables:
- Persistent staging area: https://www.leapfrogbi.com/why-you-need-a-persistent-staging-area-psa/
- Temporal databases: https://en.wikipedia.org/wiki/Temporal_database
- Tuple-versioning: https://en.wikipedia.org/wiki/Tuple-versioning
- Kimball methodology slowly changing dimensions:
- https://www.kimballgroup.com/2013/02/design-tip-152-slowly-changing-dimension-types-0-4-5-6-7/
- https://en.wikipedia.org/wiki/Slowly_changing_dimension
- Log Trigger design pattern (automatic secondary temporal "history" table via triggers): https://en.wikipedia.org/wiki/Log_trigger
The PSA table is assumed to follow the following conventions around the set of
and naming of columns. The "incoming" dataset, ie as from CSV or source system,
only requires three of them, namely: `nk`, `vtime`, `attrs`.
- `nk`: the value for the natural key of the business dataset.
- `vtime` column: valid time - a `timestamp(0)` (one second precision)
defining when the business attributes are deem "valid", or true, in the real
world.
- `xtime`: expiry time - a `timestamp(0)` when the business
attributes are deemed not-valid any longer.
- `attrs`: Business data attributes in JSON format. Use of JSON permits a
variable set of attributes per entity (natural key), effectively acting as a
hybrid document database. The rationale being that if the data model makes
non-breaking changes in the form of adding or removing attributes at the
entity grain, which happens all the time in the wild, the PSA will just keep
on humming. For the most part then, we allow downstream analysts to interpret
the meaning and implications of attributes for a given entity at any given
time.
- `attrs_digest`: a generated column that is just the MD5 hash digest of the
JSON attrs hash (sorted alphabetically by key names). Used to condense
business data into an efficiently indexable form.
- The actual primary key of the PSA table is {nk, vtime}.
Any lines in this file containing the special "prototype-only" string in allcaps
needs to be removed before productionalizing.
Parameters:
- `psa_example_table` :: replace this with the tablename of your choice.
*/
drop table if exists psa_example_table; -- PROTOTYPE-ONLY
/* Idempotently create the target PSA table. */
create table if not exists psa_example_table (
nk text not null
, vtime timestamp(0) not null default now()::timestamp(0)
, xtime timestamp(0) not null check (xtime > vtime)
, attrs_digest text not null generated always as (md5(attrs)) stored
, attrs text not null
, primary key (nk, vtime)
);
create temporary table stg ( -- PROTOTYPE-ONLY
nk text not null -- PROTOTYPE-ONLY
, vtime timestamp(0) not null default now()::timestamp(0) -- PROTOTYPE-ONLY
, xtime timestamp(0) not null check (xtime > vtime) -- PROTOTYPE-ONLY
, attrs_digest text not null generated always as (md5(attrs)) stored -- PROTOTYPE-ONLY
, attrs text not null -- PROTOTYPE-ONLY
, primary key (nk, vtime) -- PROTOTYPE-ONLY
); -- PROTOTYPE-ONLY
/* PROTOTYPE-ONLY: psa_example_table data. In prod you will not need this as the table will already be created and contain previously persisted state. */
INSERT INTO psa_example_table ("nk", "vtime", "xtime", "attrs") VALUES -- PROTOTYPE-ONLY
('1-c4ca4238a0b923820dcc509a6f75849b', '2021-05-10 17:44:11', '9999-12-31 23:59:59', 'aa89ddb9-b399-46b1-95bd-9ce80886df667413bbfd-8ef0-4e43-ae48-862f5ca6a67c'), -- PROTOTYPE-ONLY
('2-c81e728d9d4c2f636f067f89cc14862c', '2021-05-10 17:44:11', '9999-12-31 23:59:59', 'same, but different vtime'), -- PROTOTYPE-ONLY
('3-eccbc87e4b5ce2fe28308fd9f2a7baf3', '2021-05-10 17:44:11', '9999-12-31 23:59:59', '03058cb0-e224-4eaf-9ce3-b7c1970e08d5be95a79f-ffc8-4513-9950-b4fc116b23d8') -- PROTOTYPE-ONLY
; -- PROTOTYPE-ONLY
/* PROTOTYPE-ONLY: replace this with a PG 'COPY' command to read CSV from piped command. */
INSERT INTO stg ("nk", "vtime", "xtime", "attrs") VALUES -- PROTOTYPE-ONLY
('2-c81e728d9d4c2f636f067f89cc14862c', '2021-06-11 00:00:00', '9999-12-31 23:59:59', 'same, but different vtime'), -- PROTOTYPE-ONLY
('3-eccbc87e4b5ce2fe28308fd9f2a7baf3', '2021-06-11 00:00:00', '9999-12-31 23:59:59', 'this is modified'), -- PROTOTYPE-ONLY
('4-a87ff679a2f3e71d9181a67b7542122c', '2021-06-11 00:00:00', '9999-12-31 23:59:59', 'this is new') -- PROTOTYPE-ONLY
; -- PROTOTYPE-ONLY
WITH
cat as (
select nk, vtime, xtime, attrs_digest, attrs
from stg
union
select nk, vtime, xtime, attrs_digest, attrs
from psa_example_table
where exists (select 1 from stg where nk = psa_example_table.nk)
)
,
reconc as (
/*
*/
select
nk
, vtime
, xtime
, coalesce(lead(vtime,1) over (partition by nk order by vtime),'9999-12-31 23:59:59'::timestamp(0)) AS next_vtime
, attrs_digest
, coalesce(lag(attrs_digest,1) over (partition by nk order by vtime),'x') AS prev_attrs_digest
, coalesce(lead(attrs_digest,1) over (partition by nk order by vtime),'x') AS next_attrs_digest
, attrs
, case
when
( attrs_digest = /* prev_attrs_digest */ coalesce(lag(attrs_digest,1) over (partition by nk order by vtime),'x')
OR
attrs_digest = /* next_attrs_digest */ coalesce(lead(attrs_digest,1) over (partition by nk order by vtime),'x')
) then 'omit:same-attrs'
when xtime <> /* next_vtime */ coalesce(lead(vtime,1) over (partition by nk order by vtime),'9999-12-31 23:59:59'::timestamp(0)) then 'upsert:modified'
when xtime = /* next_vtime */ coalesce(lead(vtime,1) over (partition by nk order by vtime),'9999-12-31 23:59:59'::timestamp(0)) then 'upsert:new'
else 'huh?'
end
AS upsert_status
from cat
)
,
upserts as (
select
nk
, vtime
, next_vtime as xtime
, attrs
, upsert_status
from reconc
)
,
do_upsert as (
insert into psa_example_table (nk, vtime, xtime, attrs)
select nk, vtime, xtime, attrs from upserts where upsert_status like 'upsert:%'
on conflict (nk, vtime) do update set
xtime = excluded.xtime
, attrs = excluded.attrs
returning *
)
select * from do_upsert
;
select * from psa_example_table;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment