Skip to content

Instantly share code, notes, and snippets.

@evantahler
Last active September 21, 2023 01:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save evantahler/fb247d5c51789fa8f059de708aeca520 to your computer and use it in GitHub Desktop.
Save evantahler/fb247d5c51789fa8f059de708aeca520 to your computer and use it in GitHub Desktop.
users-table-type-and-dedupe-annotated
-- RAW TABLE
/*
IDEAS:
* Should this table be a Snowflake Stream?
* https://docs.snowflake.com/en/sql-reference/sql/create-stream
*/
create or replace TABLE AIRBYTE_DEVELOP."airbyte_internal"."USERS_RAW" (
"_airbyte_raw_id" VARCHAR(16777216) NOT NULL, -- Added by Airbyte, this id links the rows in these 2 tables (UUID)
"_airbyte_extracted_at" TIMESTAMP_TZ(9) DEFAULT CURRENT_TIMESTAMP(),
"_airbyte_loaded_at" TIMESTAMP_TZ(9),
"_airbyte_data" VARIANT,
primary key ("_airbyte_raw_id")
);
-- FINAL TABLE
create or replace TABLE AIRBYTE_DEVELOP."v2-internal-staging"."USERS_FINAL" (
"_airbyte_raw_id" VARCHAR(16777216) NOT NULL, -- Added by Airbyte, this id links the rows in these 2 tables (UUID)
"_airbyte_extracted_at" TIMESTAMP_TZ(9) NOT NULL,
"_airbyte_meta" VARIANT NOT NULL, -- Contains errors (and eventually other metadata like sync id, sync time, etc)
"address" OBJECT,
"occupation" VARCHAR(16777216),
"gender" VARCHAR(16777216),
"academic_degree" VARCHAR(16777216),
"weight" NUMBER(38,0),
"created_at" TIMESTAMP_TZ(9),
"language" VARCHAR(16777216),
"telephone" VARCHAR(16777216),
"title" VARCHAR(16777216),
"updated_at" TIMESTAMP_TZ(9),
"nationality" VARCHAR(16777216),
"blood_type" VARCHAR(16777216),
"name" VARCHAR(16777216),
"id" NUMBER(38,0),
"age" NUMBER(38,0),
"email" VARCHAR(16777216),
"height" VARCHAR(16777216)
);
-- This all happens within one big transaction to be idempotent
/*
General Questions:
* What's the best way to analyze each part of this transaction, both in cost and time?
*/
BEGIN TRANSACTION;
-- PHASE 1: BASIC DATA VALIDATION
/*
This block fails the transaction if there are any records to insert that are missing a primary key within the JSON/Variant data
*/
EXECUTE IMMEDIATE 'DECLARE _ab_missing_primary_key EXCEPTION (-20001, \'Table "airbyte_internal"."USERS_RAW" has rows missing a primary key\');
BEGIN
LET missing_pk_count INTEGER := (
SELECT COUNT(1)
FROM "airbyte_internal"."USERS_RAW"
WHERE
"_airbyte_loaded_at" IS NULL
AND TRY_CAST((get("_airbyte_data", \'id\'))::text as NUMBER) IS NULL
);
IF (missing_pk_count > 0) THEN
RAISE _ab_missing_primary_key;
END IF;
RETURN \'SUCCESS\';
END;
';
-- PHASE 2: TYPECASTING
/*
Of note:
* We use SAFE_CAST to prevent crashes when we encounter raw data that is not of the proper type (it happens more than you think), e.g. 'twenty' is provided for a number column
* We know that we can skip SAFE_CAST on text and JSON/VARIANT columns
* We SAFE_CAST twice, once to fill in the column, and a second time to build an error message for each column that has a problem. This is a DX goal for us
* Is there a way to avoid SAFE_CAST entirely?
* We also know that building the JSON array of errors is slow on `_AIRBYTE_META.errors=[]` - can we do this better?
* We use `WHERE "_airbyte_loaded_at" IS NULL` to only typecast and insert new records from this sync
IDEAS
* Would it be faster to build a persisted, other table and then MERGE it into the FINAL table rather than the CTE we are building here?
* https://docs.snowflake.com/en/sql-reference/sql/merge
* Can we do the date checks without regular expressions?
*/
INSERT INTO "V2-INTERNAL-STAGING"."USERS_FINAL"
(
"ADDRESS",
"OCCUPATION",
"GENDER",
"ACADEMIC_DEGREE",
"WEIGHT",
"CREATED_AT",
"LANGUAGE",
"TELEPHONE",
"TITLE",
"UPDATED_AT",
"NATIONALITY",
"BLOOD_TYPE",
"NAME",
"ID",
"AGE",
"EMAIL",
"HEIGHT",
"_AIRBYTE_META",
"_AIRBYTE_RAW_ID",
"_AIRBYTE_EXTRACTED_AT"
)
WITH intermediate_data AS (
-- here's where we do the typecasting
SELECT
CASE
WHEN TYPEOF("_airbyte_data":"address") != 'OBJECT'
THEN NULL
ELSE "_airbyte_data":"address"
END
as "ADDRESS",
(("_airbyte_data":"occupation")::text) as "OCCUPATION",
(("_airbyte_data":"gender")::text) as "GENDER",
(("_airbyte_data":"academic_degree")::text) as "ACADEMIC_DEGREE",
TRY_CAST(("_airbyte_data":"weight")::text as NUMBER) as "WEIGHT",
CASE
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH')
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
ELSE TRY_CAST(("_airbyte_data":"created_at")::TEXT AS TIMESTAMP_TZ)
END
as "CREATED_AT",
(("_airbyte_data":"language")::text) as "LANGUAGE",
(("_airbyte_data":"telephone")::text) as "TELEPHONE",
(("_airbyte_data":"title")::text) as "TITLE",
CASE
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH')
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
ELSE TRY_CAST(("_airbyte_data":"updated_at")::TEXT AS TIMESTAMP_TZ)
END
as "UPDATED_AT",
(("_airbyte_data":"nationality")::text) as "NATIONALITY",
(("_airbyte_data":"blood_type")::text) as "BLOOD_TYPE",
(("_airbyte_data":"name")::text) as "NAME",
TRY_CAST(("_airbyte_data":"id")::text as NUMBER) as "ID",
TRY_CAST(("_airbyte_data":"age")::text as NUMBER) as "AGE",
(("_airbyte_data":"email")::text) as "EMAIL",
(("_airbyte_data":"height")::text) as "HEIGHT",
-- build the error array
ARRAY_CONSTRUCT_COMPACT(CASE
WHEN (TYPEOF("_airbyte_data":"address") NOT IN ('NULL', 'NULL_VALUE'))
AND (CASE
WHEN TYPEOF("_airbyte_data":"address") != 'OBJECT'
THEN NULL
ELSE "_airbyte_data":"address"
END
IS NULL)
THEN 'Problem with `address`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"occupation") NOT IN ('NULL', 'NULL_VALUE'))
AND ((("_airbyte_data":"occupation")::text) IS NULL)
THEN 'Problem with `occupation`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"gender") NOT IN ('NULL', 'NULL_VALUE'))
AND ((("_airbyte_data":"gender")::text) IS NULL)
THEN 'Problem with `gender`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"academic_degree") NOT IN ('NULL', 'NULL_VALUE'))
AND ((("_airbyte_data":"academic_degree")::text) IS NULL)
THEN 'Problem with `academic_degree`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"weight") NOT IN ('NULL', 'NULL_VALUE'))
AND (TRY_CAST(("_airbyte_data":"weight")::text as NUMBER) IS NULL)
THEN 'Problem with `weight`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"created_at") NOT IN ('NULL', 'NULL_VALUE'))
AND (CASE
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH')
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
ELSE TRY_CAST(("_airbyte_data":"created_at")::TEXT AS TIMESTAMP_TZ)
END
IS NULL)
THEN 'Problem with `created_at`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"language") NOT IN ('NULL', 'NULL_VALUE'))
AND ((("_airbyte_data":"language")::text) IS NULL)
THEN 'Problem with `language`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"telephone") NOT IN ('NULL', 'NULL_VALUE'))
AND ((("_airbyte_data":"telephone")::text) IS NULL)
THEN 'Problem with `telephone`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"title") NOT IN ('NULL', 'NULL_VALUE'))
AND ((("_airbyte_data":"title")::text) IS NULL)
THEN 'Problem with `title`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"updated_at") NOT IN ('NULL', 'NULL_VALUE'))
AND (CASE
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH')
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}'
THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
ELSE TRY_CAST(("_airbyte_data":"updated_at")::TEXT AS TIMESTAMP_TZ)
END
IS NULL)
THEN 'Problem with `updated_at`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"nationality") NOT IN ('NULL', 'NULL_VALUE'))
AND ((("_airbyte_data":"nationality")::text) IS NULL)
THEN 'Problem with `nationality`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"blood_type") NOT IN ('NULL', 'NULL_VALUE'))
AND ((("_airbyte_data":"blood_type")::text) IS NULL)
THEN 'Problem with `blood_type`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"name") NOT IN ('NULL', 'NULL_VALUE'))
AND ((("_airbyte_data":"name")::text) IS NULL)
THEN 'Problem with `name`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"id") NOT IN ('NULL', 'NULL_VALUE'))
AND (TRY_CAST(("_airbyte_data":"id")::text as NUMBER) IS NULL)
THEN 'Problem with `id`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"age") NOT IN ('NULL', 'NULL_VALUE'))
AND (TRY_CAST(("_airbyte_data":"age")::text as NUMBER) IS NULL)
THEN 'Problem with `age`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"email") NOT IN ('NULL', 'NULL_VALUE'))
AND ((("_airbyte_data":"email")::text) IS NULL)
THEN 'Problem with `email`'
ELSE NULL
END,
CASE
WHEN (TYPEOF("_airbyte_data":"height") NOT IN ('NULL', 'NULL_VALUE'))
AND ((("_airbyte_data":"height")::text) IS NULL)
THEN 'Problem with `height`'
ELSE NULL
END) as "_airbyte_cast_errors",
-- error building complete
"_airbyte_raw_id",
"_airbyte_extracted_at"
FROM "airbyte_internal"."USERS_RAW"
-- basic filtering here
WHERE
"_airbyte_loaded_at" IS NULL
)
SELECT
"ADDRESS",
"OCCUPATION",
"GENDER",
"ACADEMIC_DEGREE",
"WEIGHT",
"CREATED_AT",
"LANGUAGE",
"TELEPHONE",
"TITLE",
"UPDATED_AT",
"NATIONALITY",
"BLOOD_TYPE",
"NAME",
"ID",
"AGE",
"EMAIL",
"HEIGHT",
OBJECT_CONSTRUCT('errors', "_airbyte_cast_errors") AS "_AIRBYTE_META", -- is this the best way to build the meta object?
"_airbyte_raw_id" AS "_AIRBYTE_RAW_ID",
"_airbyte_extracted_at" AS "_AIRBYTE_EXTRACTED_AT"
FROM intermediate_data;
-- PART 3: DEDUPLICATION
/*
* Once we have the new data mixed in with the previous data in the final table, we want to deduplicate. We delete all but the newest records grouped by PK. As an update might happen for any record already in the table, we cannot time-bound this query by the cursor or _airbyte_extracted_at.
IDEAS
* The sub-select can probably be improved. We cannot use the RAW table at this time, because it also still has duplicate entries.
*/
DELETE FROM "V2-INTERNAL-STAGING"."USERS_FINAL"
WHERE "_AIRBYTE_RAW_ID" IN (
SELECT "_AIRBYTE_RAW_ID" FROM (
SELECT "_AIRBYTE_RAW_ID", row_number() OVER (
PARTITION BY "ID" ORDER BY "UPDATED_AT" DESC NULLS LAST, "_AIRBYTE_EXTRACTED_AT" DESC
) as row_number FROM "V2-INTERNAL-STAGING"."USERS_FINAL"
)
WHERE row_number != 1
);
-- PART 4: DEDUPLICATION OF RAW TABLE
/*
Now that the final table has been de-duplicated, we can use the _airbyte_raw_id's to de-duplicate the raw table as well.
IDEAS
* Can this query be combined with the final statement in some way?
*/
DELETE FROM "airbyte_internal"."USERS_RAW"
WHERE "_airbyte_raw_id" NOT IN (
SELECT "_AIRBYTE_RAW_ID" FROM "V2-INTERNAL-STAGING"."USERS_FINAL"
);
-- PART 5: UPDATE _AIRBYTE_LOADED_AT
/*
* Finally, we update the _airbyte_loaded_at timestamp in the raw tables to indicate that these rows have already been processed.
*/
UPDATE "airbyte_internal"."USERS_RAW"
SET "_airbyte_loaded_at" = CURRENT_TIMESTAMP()
WHERE "_airbyte_loaded_at" IS NULL
;
-- commit the transaction
COMMIT;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment