Last active
March 29, 2022 18:50
-
-
Save pietheinstrengholt/2395470dfa2230701401889f25ec40b7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Set arguments | |
SourceSystemName = "AdventureWorks" | |
FlowName = "SalesLTAddress" | |
SourceStorageAccount = "synapsepiethein" | |
SourceContainer = "synapsedata" | |
SourcePath = "/landingzone/AdventureWorks/" | |
TargetStorageAccount = "synapsepiethein" | |
TargetContainer = "synapsedata" | |
TargetPath = "/processedzone/AdventureWorks" | |
SinkOperation = "merge" | |
PrimaryKey = "AddressID" | |
%%pyspark | |
from pyspark import * | |
from pyspark.sql.window import Window | |
from pyspark.sql.functions import * | |
from pyspark.sql import Row | |
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, BooleanType, DateType | |
from typing import List | |
from datetime import datetime | |
print("Database:" + SourceSystemName) | |
print("Table:" + FlowName) | |
if SinkOperation is None: | |
mssparkutils.notebook.exit("Exit! No operation type defined!") | |
if PrimaryKey is None: | |
mssparkutils.notebook.exit("Exit! No primary key defined!") | |
# Read CSV data from landing zone location | |
if 'dataChanged' in locals(): | |
dataChanged.unpersist() | |
dataChanged = spark.read.load(SourcePath + "/" + FlowName + '.parquet') | |
dataChanged.printSchema() | |
dataChanged.show() | |
from datetime import date | |
current_date = datetime.today().date() | |
from notebookutils import mssparkutils | |
try: | |
# Read original data - this is your scd type 2 table holding all data | |
dataOriginal = spark.read.load(TargetPath + "/" + FlowName, format='delta') | |
except: | |
# Use first load when no data exists yet | |
newOriginalData = dataValidated.withColumn('current', lit(True)).withColumn('effectiveDate', lit(current_date)).withColumn('endDate', lit(date(9999, 12, 31))) | |
newOriginalData.write.format("delta").mode("overwrite").save(TargetPath + "/" + FlowName) | |
newOriginalData.show() | |
newOriginalData.printSchema() | |
mssparkutils.notebook.exit("Done loading data! Newly loaded data will be used to generate original data.") | |
# Prepare for merge, rename columns of newly loaded data, append 'src_' | |
from pyspark.sql import functions as F | |
# Capture column names of incoming dataset | |
columnNames = dataChanged.schema.names | |
# Rename all columns in dataChanged, prepend src_, and add additional columns | |
df_new = dataChanged.select([F.col(c).alias("src_"+c) for c in dataChanged.columns]) | |
src_columnNames = df_new.schema.names | |
df_new2 = df_new.withColumn('src_current', lit(True)).withColumn('src_effectiveDate', lit(current_date)).withColumn('src_endDate', lit(date(9999, 12, 31))) | |
import hashlib | |
# Create dynamic columns | |
src_primaryKey = 'src_' + PrimaryKey | |
# FULL Merge, join on key column and also high date column to make only join to the latest records | |
df_merge = dataOriginal.join(df_new2, (df_new2[src_primaryKey] == dataOriginal[PrimaryKey]), how='fullouter') | |
# Derive new column to indicate the action | |
df_merge = df_merge.withColumn('action', | |
when(md5(concat_ws('+', *columnNames)) == md5(concat_ws('+', *src_columnNames)), 'NOACTION') | |
.when(df_merge.current == False, 'NOACTION') | |
.when(df_merge[src_primaryKey].isNull() & df_merge.current, 'DELETE') | |
.when(df_merge[src_primaryKey].isNull(), 'INSERT') | |
.otherwise('UPDATE') | |
) | |
# Generate target selections based on action codes | |
column_names = columnNames + ['current', 'effectiveDate', 'endDate'] | |
src_column_names = src_columnNames + ['src_current', 'src_effectiveDate', 'src_endDate'] | |
# Generate target selections based on action codes | |
column_names = columnNames + ['current', 'effectiveDate', 'endDate'] | |
src_column_names = src_columnNames + ['src_current', 'src_effectiveDate', 'src_endDate'] | |
# For records that needs no action | |
df_merge_p1 = df_merge.filter(df_merge.action == 'NOACTION').select(column_names) | |
# For records that needs insert only | |
df_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(src_column_names) | |
df_merge_p2_1 = df_merge_p2.select([F.col(c).alias(c.replace(c[0:4], "")) for c in df_merge_p2.columns]) | |
# For records that needs to be deleted | |
df_merge_p3 = df_merge.filter(df_merge.action == 'DELETE').select(column_names).withColumn('current', lit(False)).withColumn('endDate', lit(current_date)) | |
# For records that needs to be expired and then inserted | |
df_merge_p4_1 = df_merge.filter(df_merge.action == 'UPDATE').select(src_column_names) | |
df_merge_p4_2 = df_merge_p4_1.select([F.col(c).alias(c.replace(c[0:4], "")) for c in df_merge_p2.columns]) | |
# Replace src_ alias in all columns | |
df_merge_p4_3 = df_merge.filter(df_merge.action == 'UPDATE').withColumn('endDate', date_sub(df_merge.src_effectiveDate, 1)).withColumn('current', lit(False)).select(column_names) | |
# Union all records together | |
df_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(df_merge_p3).unionAll(df_merge_p4_2).unionAll(df_merge_p4_3) | |
# At last, you can overwrite existing data using this new data frame | |
df_merge_final.write.format("delta").mode("overwrite").save(SourcePath + "/" + FlowName) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment