Skip to content

Instantly share code, notes, and snippets.

@maranite
Last active March 15, 2024 04:27
Show Gist options
  • Save maranite/d05a2538515bf23b0957651a7dac9e31 to your computer and use it in GitHub Desktop.
Save maranite/d05a2538515bf23b0957651a7dac9e31 to your computer and use it in GitHub Desktop.
data-versioning.ipynb
Display the source blob
Display the rendered blob
Raw
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"provenance": [],
"collapsed_sections": [
"GRaHVQ5eC20I",
"aOF9ADKEdUnY",
"j__8ukDrbMHc",
"QzLsP_8bWXby",
"N5jqu3WQiX8Y",
"U-rRRpCbdwAS",
"6TayR8SAjxGE"
],
"authorship_tag": "ABX9TyMuofCpD0vj+ktmNQrtutUZ",
"include_colab_link": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/maranite/d05a2538515bf23b0957651a7dac9e31/data-versioning.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "markdown",
"source": [
"# Notebook set up and imports"
],
"metadata": {
"id": "WkmfVR5qhGjR"
}
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"id": "52oedKlQh_Hd",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "e41c78f5-fa68-4516-d946-2d70ba297a3d"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/\n",
"Requirement already satisfied: pyspark in /usr/local/lib/python3.10/dist-packages (3.4.0)\n",
"Requirement already satisfied: py4j==0.10.9.7 in /usr/local/lib/python3.10/dist-packages (from pyspark) (0.10.9.7)\n"
]
}
],
"source": [
"!pip install pyspark"
]
},
{
"cell_type": "code",
"source": [
"# Import SparkSession\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql.types import *\n",
"from pyspark.sql.window import Window\n",
"from datetime import date, datetime\n",
"# from delta.tables import *\n",
"\n",
"# Create a Spark Session\n",
"spark = SparkSession.builder.master(\"local[*]\").getOrCreate()\n",
"# Check Spark Session Information\n",
"# spark"
],
"metadata": {
"id": "L6mgR_D6iINm"
},
"execution_count": 17,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"# Establish some helpful constants and expressions"
],
"metadata": {
"id": "mUrmKbUre4MY"
}
},
{
"cell_type": "code",
"source": [
"# Column name constants\n",
"start_at = \"start_at\"\n",
"end_at = \"end_at\"\n",
"key_name = \"__key\"\n",
"original_start = \"__start_at\"\n",
"original_end = \"__end_at\"\n",
"source_key = \"__skey\"\n",
"target_key = \"__tkey\"\n",
"source_data = \"__sdata\"\n",
"target_data = \"__tdata\"\n",
"\n",
"forever = datetime(2999,12,31, 0,0,0)\n",
"extract_date = datetime(2023,1,1, 0,0,0)\n",
"extracted = lit(extract_date)\n",
"\n",
"source_columns = StructType([\n",
" StructField(\"id\", IntegerType(), False),\n",
" StructField(\"fname\", StringType(), True),\n",
" StructField(\"sname\", StringType(), True),\n",
" StructField(\"dob\", DateType(), True),\n",
"])\n",
"\n",
"primary_key = source_columns[0:1]\n",
"\n",
"scd_columns = StructType([\n",
" StructField(start_at, TimestampType(), False),\n",
" StructField(end_at, TimestampType(), False),\n",
"])\n",
"\n",
"common_cols = [col(c) for c in source_columns.fieldNames()]\n",
"primary_key_cols = [col(c) for c in source_columns[0:1].fieldNames()]\n",
"\n",
"# join predicate expressions\n",
"on_keys = (col(target_key) == col(source_key))\n",
"on_data = (col(source_data) == col(target_data))\n",
"spans_extract_date = (col(start_at) < extracted) & (extracted < col(end_at))"
],
"metadata": {
"id": "sssm2HaIiPY9"
},
"execution_count": 18,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"# Setup Test Data Sets"
],
"metadata": {
"id": "-fIP0PmcCrOz"
}
},
{
"cell_type": "code",
"source": [
"target = spark.createDataFrame([\n",
" (2, \"Should Be\", \"Replaced \", date(1945,10,4), datetime(2020,12, 1,0,0,0), forever),\n",
" (3, \"History\", \"To Restate \", date(1945,10,4), datetime(2023, 1, 1,0,0,0), forever),\n",
" (4, \"History\", \"To Shorten \", date(1945,10,4), datetime(2020,12, 1,0,0,0), datetime(2023,10,15,0,0,0)),\n",
" (4, \"Current\", \"To Leave \", date(1955,11,5), datetime(2023,10,15,0,0,0), forever),\n",
" (5, \"Should \", \"Be Preceded\", date(1955,11,5), datetime(2023,12,15,0,0,0), forever),\n",
" (6, \"Should \" , \"Not Change\", date(1950,10,4), datetime(2020,12, 1,0,0,0), forever),\n",
" (7, \"History\", \"To Shorten \", date(1998,12,4), datetime(2020,12, 1,0,0,0), datetime(2024,4,15,0,0,0)),\n",
" (7, \"History\", \"To Remain \", date(1995,11,4), datetime(2024, 4,15,0,0,0), forever),\n",
" (8, \"Future \", \"Person \", date(1998,12,4), datetime(2025,12, 1,0,0,0), forever),\n",
" (9, \"Should \", \"Delete \", date(1998,12,4), datetime(2023, 1, 1,0,0,0), forever)\n",
" ],\n",
" StructType([*source_columns , *scd_columns])\n",
")"
],
"metadata": {
"id": "kAw8HDAFql_S"
},
"execution_count": 19,
"outputs": []
},
{
"cell_type": "code",
"source": [
"source = spark.createDataFrame(\n",
" [(1, \"Jenny\", \"New\", date(1998,12,4)),\n",
" (2, \"Intersected\", \"Current B\", date(1945,10,4)),\n",
" (3, \"Restated\", \"History\", date(1945,10,4)),\n",
" (4, \"Intersect\", \"History B\", date(1999,10,4)),\n",
" (5, \"Precede\", \"Past\", date(1900,11,5)),\n",
" (6, \"Should \" , \"Not Change\", date(1950,10,4))\n",
" ],\n",
" source_columns\n",
")"
],
"metadata": {
"id": "mLWFSk6tmK26"
},
"execution_count": 20,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"# Restructure source to simplify joins"
],
"metadata": {
"id": "GRaHVQ5eC20I"
}
},
{
"cell_type": "code",
"source": [
"_source = source \\\n",
" .select(\n",
" struct(primary_key_cols).alias(source_key),\n",
" struct(common_cols).alias(source_data)\n",
" )\n",
"\n",
"_source.show()"
],
"metadata": {
"id": "lbTIoUMhwMEf",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "81c301d3-0c17-46cf-9391-a4f6335d6aed"
},
"execution_count": 21,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+------+--------------------+\n",
"|__skey| __sdata|\n",
"+------+--------------------+\n",
"| {1}|{1, Jenny, New, 1...|\n",
"| {2}|{2, Intersected, ...|\n",
"| {3}|{3, Restated, His...|\n",
"| {4}|{4, Intersect, Hi...|\n",
"| {5}|{5, Precede, Past...|\n",
"| {6}|{6, Should , Not ...|\n",
"+------+--------------------+\n",
"\n"
]
}
]
},
{
"cell_type": "markdown",
"source": [
"# Restructure target to simplify joins, retain original date span, etc"
],
"metadata": {
"id": "aOF9ADKEdUnY"
}
},
{
"cell_type": "code",
"source": [
"_target = target \\\n",
" .select(\n",
" struct(primary_key_cols).alias(target_key),\n",
" col(start_at).alias(original_start),\n",
" col(end_at).alias(original_end),\n",
" start_at,\n",
" end_at,\n",
" struct(common_cols).alias(target_data)\n",
" )\n",
"\n",
"_target.show()"
],
"metadata": {
"id": "BtHFhvuqEkOy",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "60aa0932-054c-434b-d416-c908e0eee31f"
},
"execution_count": 22,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+------+-------------------+-------------------+-------------------+-------------------+--------------------+\n",
"|__tkey| __start_at| __end_at| start_at| end_at| __tdata|\n",
"+------+-------------------+-------------------+-------------------+-------------------+--------------------+\n",
"| {2}|2020-12-01 00:00:00|2999-12-31 00:00:00|2020-12-01 00:00:00|2999-12-31 00:00:00|{2, Should Be, Re...|\n",
"| {3}|2023-01-01 00:00:00|2999-12-31 00:00:00|2023-01-01 00:00:00|2999-12-31 00:00:00|{3, History, To R...|\n",
"| {4}|2020-12-01 00:00:00|2023-10-15 00:00:00|2020-12-01 00:00:00|2023-10-15 00:00:00|{4, History, To S...|\n",
"| {4}|2023-10-15 00:00:00|2999-12-31 00:00:00|2023-10-15 00:00:00|2999-12-31 00:00:00|{4, Current, To L...|\n",
"| {5}|2023-12-15 00:00:00|2999-12-31 00:00:00|2023-12-15 00:00:00|2999-12-31 00:00:00|{5, Should , Be P...|\n",
"| {6}|2020-12-01 00:00:00|2999-12-31 00:00:00|2020-12-01 00:00:00|2999-12-31 00:00:00|{6, Should , Not ...|\n",
"| {7}|2020-12-01 00:00:00|2024-04-15 00:00:00|2020-12-01 00:00:00|2024-04-15 00:00:00|{7, History, To S...|\n",
"| {7}|2024-04-15 00:00:00|2999-12-31 00:00:00|2024-04-15 00:00:00|2999-12-31 00:00:00|{7, History, To R...|\n",
"| {8}|2025-12-01 00:00:00|2999-12-31 00:00:00|2025-12-01 00:00:00|2999-12-31 00:00:00|{8, Future , Pers...|\n",
"| {9}|2023-01-01 00:00:00|2999-12-31 00:00:00|2023-01-01 00:00:00|2999-12-31 00:00:00|{9, Should , Dele...|\n",
"+------+-------------------+-------------------+-------------------+-------------------+--------------------+\n",
"\n"
]
}
]
},
{
"cell_type": "markdown",
"source": [
"# Retirements\n",
"Any existing version record which spans the extract data, where the source record's data differs, must be retired as at the extract date. This dataframe identifies these targets records."
],
"metadata": {
"id": "Wb0cwqO0U4jf"
}
},
{
"cell_type": "code",
"source": [
"to_retire = \\\n",
" _target.where(spans_extract_date) \\\n",
" .join(_source, on_keys & on_data, \"leftanti\") \\\n",
" .select(\n",
" col(target_key).alias(\"key\"),\n",
" original_start, original_end ,\"start_at\", extracted.alias(\"end_at\"),\n",
" col(target_data + \".*\")\n",
" )\n",
"\n",
"to_retire.show()"
],
"metadata": {
"id": "2w9iGXS7x9HR",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "4e3814d3-d55f-4567-8839-f254b10915f2"
},
"execution_count": 23,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+---+-------------------+-------------------+-------------------+-------------------+---+---------+-----------+----------+\n",
"|key| __start_at| __end_at| start_at| end_at| id| fname| sname| dob|\n",
"+---+-------------------+-------------------+-------------------+-------------------+---+---------+-----------+----------+\n",
"|{2}|2020-12-01 00:00:00|2999-12-31 00:00:00|2020-12-01 00:00:00|2023-01-01 00:00:00| 2|Should Be| Replaced |1945-10-04|\n",
"|{4}|2020-12-01 00:00:00|2023-10-15 00:00:00|2020-12-01 00:00:00|2023-01-01 00:00:00| 4| History|To Shorten |1945-10-04|\n",
"|{7}|2020-12-01 00:00:00|2024-04-15 00:00:00|2020-12-01 00:00:00|2023-01-01 00:00:00| 7| History|To Shorten |1998-12-04|\n",
"+---+-------------------+-------------------+-------------------+-------------------+---+---------+-----------+----------+\n",
"\n"
]
}
]
},
{
"cell_type": "markdown",
"source": [
"# Restates\n",
"Data is restated when an extract is provided for the exact date of a prior extract, implying that the prior data needs to be restated.\n",
"This data frame collects all versioned target records who's data needs to be overwritten. Note that Deletions handles the edge case where a restating extract omits a record signalling that it needs to be removed from history."
],
"metadata": {
"id": "j__8ukDrbMHc"
}
},
{
"cell_type": "code",
"source": [
"to_update = _target \\\n",
" .where(col(start_at) == extracted) \\\n",
" .join(_source, on_keys , \"inner\" ) \\\n",
" .where( col(source_data) != col(target_data) ) \\\n",
" .select(\n",
" col(target_key).alias(\"key\"),\n",
" original_start, original_end ,\"start_at\", \"end_at\",\n",
" col(source_data + \".*\"),\n",
" )\n",
"\n",
"to_update.show()"
],
"metadata": {
"id": "6aD32tzML7Bg",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "3c79c943-bfb8-4cf8-ee26-338b3041abc6"
},
"execution_count": 24,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+---+-------------------+-------------------+-------------------+-------------------+---+--------+-------+----------+\n",
"|key| __start_at| __end_at| start_at| end_at| id| fname| sname| dob|\n",
"+---+-------------------+-------------------+-------------------+-------------------+---+--------+-------+----------+\n",
"|{3}|2023-01-01 00:00:00|2999-12-31 00:00:00|2023-01-01 00:00:00|2999-12-31 00:00:00| 3|Restated|History|1945-10-04|\n",
"+---+-------------------+-------------------+-------------------+-------------------+---+--------+-------+----------+\n",
"\n"
]
}
]
},
{
"cell_type": "markdown",
"source": [
"# Deletions\n",
"Delete any records that start exactly at the extract date, but are missing in the current extract"
],
"metadata": {
"id": "adEUL4EGWC4M"
}
},
{
"cell_type": "code",
"source": [
"to_delete = _target \\\n",
" .where(col(start_at) == extracted) \\\n",
" .join(_source, on_keys , \"leftanti\") \\\n",
" .select(\n",
" col(target_key).alias(\"key\"),\n",
" original_start,\n",
" original_end\n",
" )\n",
"\n",
"to_delete.show()"
],
"metadata": {
"id": "Te5WKDXiOWlN",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "271b6785-ffd5-4f68-9673-211197e1dfdf"
},
"execution_count": 25,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+---+-------------------+-------------------+\n",
"|key| __start_at| __end_at|\n",
"+---+-------------------+-------------------+\n",
"|{9}|2023-01-01 00:00:00|2999-12-31 00:00:00|\n",
"+---+-------------------+-------------------+\n",
"\n"
]
}
]
},
{
"cell_type": "markdown",
"source": [
"# Inserts\n",
"New source records need to be inserted, with their end_at date appropriately set so as not to overlap with any existing future versions."
],
"metadata": {
"id": "QzLsP_8bWXby"
}
},
{
"cell_type": "code",
"source": [
"next_end_ats = target \\\n",
" .withColumn(end_at, explode(array(start_at, end_at))) \\\n",
" .filter(col(end_at) > extracted) \\\n",
" .groupBy(struct(primary_key_cols).alias(target_key)) \\\n",
" .agg(min(col(end_at)).alias(end_at))\n",
"\n",
"\n",
"to_insert = _source \\\n",
" .join(\n",
" _target.where(spans_extract_date),\n",
" on_keys & on_data,\n",
" \"leftanti\") \\\n",
" .join(\n",
" _target,\n",
" on_keys & (col(original_start) == extracted),\n",
" \"leftanti\") \\\n",
" .join(\n",
" next_end_ats,\n",
" on_keys,\n",
" \"leftouter\"\n",
" ) \\\n",
" .select(\n",
" col(source_key).alias(\"key\"),\n",
" extracted.alias(start_at),\n",
" coalesce(col(end_at), lit(forever)).alias(end_at),\n",
" col(source_data + \".*\")\n",
" )\n",
"\n",
"to_insert.show()"
],
"metadata": {
"id": "Yxa1hwxMSirm",
"colab": {
"base_uri": "https://localhost:8080/"
},
"outputId": "aeb6a38a-639a-4814-c29c-ab5c093ef53f"
},
"execution_count": 26,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+---+-------------------+-------------------+---+-----------+---------+----------+\n",
"|key| start_at| end_at| id| fname| sname| dob|\n",
"+---+-------------------+-------------------+---+-----------+---------+----------+\n",
"|{1}|2023-01-01 00:00:00|2999-12-31 00:00:00| 1| Jenny| New|1998-12-04|\n",
"|{2}|2023-01-01 00:00:00|2999-12-31 00:00:00| 2|Intersected|Current B|1945-10-04|\n",
"|{4}|2023-01-01 00:00:00|2023-10-15 00:00:00| 4| Intersect|History B|1999-10-04|\n",
"|{5}|2023-01-01 00:00:00|2023-12-15 00:00:00| 5| Precede| Past|1900-11-05|\n",
"+---+-------------------+-------------------+---+-----------+---------+----------+\n",
"\n"
]
}
]
},
{
"cell_type": "markdown",
"source": [
"# Assemble a Delta-friendly data frame\n",
"\n",
"Union together the above dataframes to a common _mergable_ frame where:\n",
"\n",
"1. The MERGE join is on key, __orig_start_at , __orig_end_at\n",
"2. Any target rows which join where start_at is null, are deleted\n",
"2. All other target rows where start_at is not null, are updated (for all fields)\n",
"3. All source rows where __orig_start_at is null are inserted.\n",
"\n"
],
"metadata": {
"id": "N5jqu3WQiX8Y"
}
},
{
"cell_type": "code",
"source": [
"delta = to_update \\\n",
" .unionByName(to_retire, allowMissingColumns=True ) \\\n",
" .unionByName(to_delete, allowMissingColumns=True ) \\\n",
" .unionByName(to_insert, allowMissingColumns=True ) \\\n",
" .orderBy(\"key\", coalesce(original_start, \"start_at\"))\n",
"\n",
"delta.show()"
],
"metadata": {
"id": "l6bO8BhPiYNT",
"outputId": "5014ef23-db0c-4444-e982-ea8337f27bec",
"colab": {
"base_uri": "https://localhost:8080/"
}
},
"execution_count": 27,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+---+-------------------+-------------------+-------------------+-------------------+----+-----------+-----------+----------+\n",
"|key| __start_at| __end_at| start_at| end_at| id| fname| sname| dob|\n",
"+---+-------------------+-------------------+-------------------+-------------------+----+-----------+-----------+----------+\n",
"|{1}| null| null|2023-01-01 00:00:00|2999-12-31 00:00:00| 1| Jenny| New|1998-12-04|\n",
"|{2}|2020-12-01 00:00:00|2999-12-31 00:00:00|2020-12-01 00:00:00|2023-01-01 00:00:00| 2| Should Be| Replaced |1945-10-04|\n",
"|{2}| null| null|2023-01-01 00:00:00|2999-12-31 00:00:00| 2|Intersected| Current B|1945-10-04|\n",
"|{3}|2023-01-01 00:00:00|2999-12-31 00:00:00|2023-01-01 00:00:00|2999-12-31 00:00:00| 3| Restated| History|1945-10-04|\n",
"|{4}|2020-12-01 00:00:00|2023-10-15 00:00:00|2020-12-01 00:00:00|2023-01-01 00:00:00| 4| History|To Shorten |1945-10-04|\n",
"|{4}| null| null|2023-01-01 00:00:00|2023-10-15 00:00:00| 4| Intersect| History B|1999-10-04|\n",
"|{5}| null| null|2023-01-01 00:00:00|2023-12-15 00:00:00| 5| Precede| Past|1900-11-05|\n",
"|{7}|2020-12-01 00:00:00|2024-04-15 00:00:00|2020-12-01 00:00:00|2023-01-01 00:00:00| 7| History|To Shorten |1998-12-04|\n",
"|{9}|2023-01-01 00:00:00|2999-12-31 00:00:00| null| null|null| null| null| null|\n",
"+---+-------------------+-------------------+-------------------+-------------------+----+-----------+-----------+----------+\n",
"\n"
]
}
]
},
{
"cell_type": "markdown",
"source": [
"# Comparing Target, Source and Delta"
],
"metadata": {
"id": "P8_T0c9tS1b4"
}
},
{
"cell_type": "code",
"source": [
"print(\"Existing History\")\n",
"target.show(truncate = False)\n",
"\n",
"print(\"Source\")\n",
"source.show(truncate = False)\n",
"\n",
"print(\"Changes (Delta)\")\n",
"delta \\\n",
" .withColumn(\"Operation\",\n",
" when(col(original_start).isNull(), \"INSERT\") \\\n",
" .when(col(start_at).isNull(), \"DELETE\") \\\n",
" .when((col(original_end) == forever) & (col(end_at) != forever), \"RETIRE\") \\\n",
" .when(col(original_end) != col(end_at), \"CHANGE PERIOD\") \\\n",
" .otherwise(\"RESTATE\") \\\n",
" ) \\\n",
" .show(truncate = False)"
],
"metadata": {
"id": "KrtwB5rcS6oA",
"outputId": "ade4d451-2454-4f7e-deb7-d5cc52b73b24",
"colab": {
"base_uri": "https://localhost:8080/"
}
},
"execution_count": 28,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Existing History\n",
"+---+---------+-----------+----------+-------------------+-------------------+\n",
"|id |fname |sname |dob |start_at |end_at |\n",
"+---+---------+-----------+----------+-------------------+-------------------+\n",
"|2 |Should Be|Replaced |1945-10-04|2020-12-01 00:00:00|2999-12-31 00:00:00|\n",
"|3 |History |To Restate |1945-10-04|2023-01-01 00:00:00|2999-12-31 00:00:00|\n",
"|4 |History |To Shorten |1945-10-04|2020-12-01 00:00:00|2023-10-15 00:00:00|\n",
"|4 |Current |To Leave |1955-11-05|2023-10-15 00:00:00|2999-12-31 00:00:00|\n",
"|5 |Should |Be Preceded|1955-11-05|2023-12-15 00:00:00|2999-12-31 00:00:00|\n",
"|6 |Should |Not Change |1950-10-04|2020-12-01 00:00:00|2999-12-31 00:00:00|\n",
"|7 |History |To Shorten |1998-12-04|2020-12-01 00:00:00|2024-04-15 00:00:00|\n",
"|7 |History |To Remain |1995-11-04|2024-04-15 00:00:00|2999-12-31 00:00:00|\n",
"|8 |Future |Person |1998-12-04|2025-12-01 00:00:00|2999-12-31 00:00:00|\n",
"|9 |Should |Delete |1998-12-04|2023-01-01 00:00:00|2999-12-31 00:00:00|\n",
"+---+---------+-----------+----------+-------------------+-------------------+\n",
"\n",
"Source\n",
"+---+-----------+----------+----------+\n",
"|id |fname |sname |dob |\n",
"+---+-----------+----------+----------+\n",
"|1 |Jenny |New |1998-12-04|\n",
"|2 |Intersected|Current B |1945-10-04|\n",
"|3 |Restated |History |1945-10-04|\n",
"|4 |Intersect |History B |1999-10-04|\n",
"|5 |Precede |Past |1900-11-05|\n",
"|6 |Should |Not Change|1950-10-04|\n",
"+---+-----------+----------+----------+\n",
"\n",
"Changes (Delta)\n",
"+---+-------------------+-------------------+-------------------+-------------------+----+-----------+-----------+----------+-------------+\n",
"|key|__start_at |__end_at |start_at |end_at |id |fname |sname |dob |Operation |\n",
"+---+-------------------+-------------------+-------------------+-------------------+----+-----------+-----------+----------+-------------+\n",
"|{1}|null |null |2023-01-01 00:00:00|2999-12-31 00:00:00|1 |Jenny |New |1998-12-04|INSERT |\n",
"|{2}|2020-12-01 00:00:00|2999-12-31 00:00:00|2020-12-01 00:00:00|2023-01-01 00:00:00|2 |Should Be |Replaced |1945-10-04|RETIRE |\n",
"|{2}|null |null |2023-01-01 00:00:00|2999-12-31 00:00:00|2 |Intersected|Current B |1945-10-04|INSERT |\n",
"|{3}|2023-01-01 00:00:00|2999-12-31 00:00:00|2023-01-01 00:00:00|2999-12-31 00:00:00|3 |Restated |History |1945-10-04|RESTATE |\n",
"|{4}|2020-12-01 00:00:00|2023-10-15 00:00:00|2020-12-01 00:00:00|2023-01-01 00:00:00|4 |History |To Shorten |1945-10-04|CHANGE PERIOD|\n",
"|{4}|null |null |2023-01-01 00:00:00|2023-10-15 00:00:00|4 |Intersect |History B |1999-10-04|INSERT |\n",
"|{5}|null |null |2023-01-01 00:00:00|2023-12-15 00:00:00|5 |Precede |Past |1900-11-05|INSERT |\n",
"|{7}|2020-12-01 00:00:00|2024-04-15 00:00:00|2020-12-01 00:00:00|2023-01-01 00:00:00|7 |History |To Shorten |1998-12-04|CHANGE PERIOD|\n",
"|{9}|2023-01-01 00:00:00|2999-12-31 00:00:00|null |null |null|null |null |null |DELETE |\n",
"+---+-------------------+-------------------+-------------------+-------------------+----+-----------+-----------+----------+-------------+\n",
"\n"
]
}
]
},
{
"cell_type": "markdown",
"source": [
"# Bring it all together into a single function"
],
"metadata": {
"id": "U-rRRpCbdwAS"
}
},
{
"cell_type": "code",
"source": [
"def create_changed_data_dataframe( \\\n",
" source : DataFrame,\n",
" target : DataFrame,\n",
" primary_key : List[str],\n",
" extract_date : datetime,\n",
" start_at : str = \"start_at\",\n",
" end_at : str = \"end_at\"\n",
" ):\n",
" '''\n",
" Returns a \"mergable\" dataframe based on compating a\n",
" source dataframe (representing current records at source)\n",
" to a target dataframe (representing historc versioned records)\n",
" '''\n",
" # Column name constants\n",
" key_name = \"__key\"\n",
" original_start = \"__start_at\"\n",
" original_end = \"__end_at\"\n",
" source_key = \"__skey\"\n",
" target_key = \"__tkey\"\n",
" source_data = \"__sdata\"\n",
" target_data = \"__tdata\"\n",
"\n",
" forever = datetime(2999,12,31, 0, 0, 0)\n",
" extracted = lit(extract_date)\n",
"\n",
" # join predicate expressions\n",
" on_keys = (col(target_key) == col(source_key))\n",
" on_data = (col(source_data) == col(target_data))\n",
" spans_extract_date = (col(start_at) < extracted) & (extracted < col(end_at))\n",
"\n",
" # schema helper variables\n",
" scd_columns = StructType([\n",
" StructField(start_at, TimestampType(), False),\n",
" StructField(end_at, TimestampType(), False),\n",
" ])\n",
"\n",
" common_cols = [col(c) for c in source.schema.fieldNames() if c in target.schema.fieldNames()]\n",
" key_cols = [col(c) for c in primary_key]\n",
"\n",
" # Restructure source to simplify joins\n",
" _source = source \\\n",
" .select(\n",
" struct(key_cols).alias(source_key),\n",
" struct(common_cols).alias(source_data)\n",
" )\n",
"\n",
" # Restructure target to simplify joins, retain original date span\n",
" _target = target \\\n",
" .select(\n",
" struct(key_cols).alias(target_key),\n",
" col(start_at).alias(original_start),\n",
" col(end_at).alias(original_end),\n",
" start_at,\n",
" end_at,\n",
" struct(common_cols).alias(target_data)\n",
" )\n",
"\n",
" # Any existing version record which spans the extract data,\n",
" # where the source record's data differs, must be retired\n",
" # as at the extract date.\n",
" to_retire = \\\n",
" _target.where(spans_extract_date) \\\n",
" .join(_source, on_keys & on_data, \"leftanti\") \\\n",
" .select(\n",
" col(target_key).alias(\"key\"),\n",
" original_start, original_end ,\"start_at\", extracted.alias(\"end_at\"),\n",
" col(target_data + \".*\")\n",
" )\n",
"\n",
" # Data is restated when an extract is provided for the exact date of\n",
" # a prior extract, implying that the prior data needs to be restated.\n",
" # This data frame collects all versioned target records who's data\n",
" # needs to be overwritten.\n",
" to_update = _target \\\n",
" .where(col(start_at) == extracted) \\\n",
" .join(_source, on_keys , \"inner\" ) \\\n",
" .where( col(source_data) != col(target_data) ) \\\n",
" .select(\n",
" col(target_key).alias(\"key\"),\n",
" original_start, original_end ,\"start_at\", \"end_at\",\n",
" col(source_data + \".*\"),\n",
" )\n",
"\n",
" # Version records belonging to chains that began before the extract date,\n",
" # but where the record is missing in this extract, need to be truncated as\n",
" # of the extract date. This means that all more recent version records\n",
" # need to be deleted, and these to-delete target records are determined here.\n",
" # to_delete = _target \\\n",
" # .withColumn(\n",
" # \"__first_start\",\n",
" # min(col(start_at)).over(Window.partitionBy(col(target_key)))) \\\n",
" # .where((col(start_at) >= extracted) & (col(\"__first_start\") < extracted)) \\\n",
" # .join(_source, on_keys , \"leftanti\") \\\n",
" # .select(\n",
" # col(target_key).alias(\"key\"),\n",
" # original_start,\n",
" # original_end\n",
" # )\n",
"\n",
" # revision: Version records starting at the extract date, which are now\n",
" # missing in the current extract, need to be delete.\n",
" to_delete = _target \\\n",
" .where(col(start_at) == extracted) \\\n",
" .join(_source, on_keys , \"leftanti\") \\\n",
" .select(\n",
" col(target_key).alias(\"key\"),\n",
" original_start,\n",
" original_end\n",
" )\n",
"\n",
" # Look at existing future hsitory records and calculate\n",
" # the next end_at date for each key.\n",
" next_end_ats = target \\\n",
" .withColumn(end_at, explode(array(start_at, end_at))) \\\n",
" .filter(col(end_at) > extracted) \\\n",
" .groupBy(struct(key_cols).alias(target_key)) \\\n",
" .agg(min(col(end_at)).alias(end_at))\n",
"\n",
" # Identify new source records to be inserted, with their end_at\n",
" # date appropriately set so as not to overlap with any\n",
" # existing records starting in the future.\n",
" to_insert = _source \\\n",
" .join(\n",
" _target.where(spans_extract_date),\n",
" on_keys & on_data,\n",
" \"leftanti\") \\\n",
" .join(\n",
" _target,\n",
" on_keys & (col(original_start) == extracted),\n",
" \"leftanti\") \\\n",
" .join(\n",
" next_end_ats,\n",
" on_keys,\n",
" \"leftouter\"\n",
" ) \\\n",
" .select(\n",
" col(source_key).alias(\"key\"),\n",
" extracted.alias(start_at),\n",
" coalesce(col(end_at), lit(forever)).alias(end_at),\n",
" col(source_data + \".*\")\n",
" )\n",
"\n",
" # union all the dataframes into a single delta-friendly\n",
" # result set\n",
" delta = to_update \\\n",
" .unionByName(to_retire, allowMissingColumns=True ) \\\n",
" .unionByName(to_delete, allowMissingColumns=True ) \\\n",
" .unionByName(to_insert, allowMissingColumns=True )\n",
"\n",
" return delta"
],
"metadata": {
"id": "Jr3Goc9advMJ"
},
"execution_count": 29,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"# Test the new function"
],
"metadata": {
"id": "6TayR8SAjxGE"
}
},
{
"cell_type": "code",
"source": [
"create_changed_data_dataframe(\n",
" source,\n",
" target,\n",
" [\"id\"],\n",
" datetime(2023,1,31, 0, 0, 0)\n",
" ).show()"
],
"metadata": {
"id": "yFwFHBrghPpV",
"outputId": "d7afab6d-515e-4216-9628-a8a080e0d723",
"colab": {
"base_uri": "https://localhost:8080/"
}
},
"execution_count": 30,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"+---+-------------------+-------------------+-------------------+-------------------+---+-----------+-----------+----------+\n",
"|key| __start_at| __end_at| start_at| end_at| id| fname| sname| dob|\n",
"+---+-------------------+-------------------+-------------------+-------------------+---+-----------+-----------+----------+\n",
"|{3}|2023-01-01 00:00:00|2999-12-31 00:00:00|2023-01-01 00:00:00|2023-01-31 00:00:00| 3| History|To Restate |1945-10-04|\n",
"|{2}|2020-12-01 00:00:00|2999-12-31 00:00:00|2020-12-01 00:00:00|2023-01-31 00:00:00| 2| Should Be| Replaced |1945-10-04|\n",
"|{4}|2020-12-01 00:00:00|2023-10-15 00:00:00|2020-12-01 00:00:00|2023-01-31 00:00:00| 4| History|To Shorten |1945-10-04|\n",
"|{7}|2020-12-01 00:00:00|2024-04-15 00:00:00|2020-12-01 00:00:00|2023-01-31 00:00:00| 7| History|To Shorten |1998-12-04|\n",
"|{9}|2023-01-01 00:00:00|2999-12-31 00:00:00|2023-01-01 00:00:00|2023-01-31 00:00:00| 9| Should |Delete |1998-12-04|\n",
"|{1}| null| null|2023-01-31 00:00:00|2999-12-31 00:00:00| 1| Jenny| New|1998-12-04|\n",
"|{3}| null| null|2023-01-31 00:00:00|2999-12-31 00:00:00| 3| Restated| History|1945-10-04|\n",
"|{5}| null| null|2023-01-31 00:00:00|2023-12-15 00:00:00| 5| Precede| Past|1900-11-05|\n",
"|{4}| null| null|2023-01-31 00:00:00|2023-10-15 00:00:00| 4| Intersect| History B|1999-10-04|\n",
"|{2}| null| null|2023-01-31 00:00:00|2999-12-31 00:00:00| 2|Intersected| Current B|1945-10-04|\n",
"+---+-------------------+-------------------+-------------------+-------------------+---+-----------+-----------+----------+\n",
"\n"
]
}
]
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment