Skip to content

Instantly share code, notes, and snippets.

@pietheinstrengholt
Last active March 29, 2022 18:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pietheinstrengholt/2395470dfa2230701401889f25ec40b7 to your computer and use it in GitHub Desktop.
Save pietheinstrengholt/2395470dfa2230701401889f25ec40b7 to your computer and use it in GitHub Desktop.
# Set arguments
SourceSystemName = "AdventureWorks"
FlowName = "SalesLTAddress"
SourceStorageAccount = "synapsepiethein"
SourceContainer = "synapsedata"
SourcePath = "/landingzone/AdventureWorks/"
TargetStorageAccount = "synapsepiethein"
TargetContainer = "synapsedata"
TargetPath = "/processedzone/AdventureWorks"
SinkOperation = "merge"
PrimaryKey = "AddressID"
%%pyspark
from pyspark import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, BooleanType, DateType
from typing import List
from datetime import datetime
print("Database:" + SourceSystemName)
print("Table:" + FlowName)
if SinkOperation is None:
mssparkutils.notebook.exit("Exit! No operation type defined!")
if PrimaryKey is None:
mssparkutils.notebook.exit("Exit! No primary key defined!")
# Read CSV data from landing zone location
if 'dataChanged' in locals():
dataChanged.unpersist()
dataChanged = spark.read.load(SourcePath + "/" + FlowName + '.parquet')
dataChanged.printSchema()
dataChanged.show()
from datetime import date
current_date = datetime.today().date()
from notebookutils import mssparkutils
try:
# Read original data - this is your scd type 2 table holding all data
dataOriginal = spark.read.load(TargetPath + "/" + FlowName, format='delta')
except:
# Use first load when no data exists yet
newOriginalData = dataValidated.withColumn('current', lit(True)).withColumn('effectiveDate', lit(current_date)).withColumn('endDate', lit(date(9999, 12, 31)))
newOriginalData.write.format("delta").mode("overwrite").save(TargetPath + "/" + FlowName)
newOriginalData.show()
newOriginalData.printSchema()
mssparkutils.notebook.exit("Done loading data! Newly loaded data will be used to generate original data.")
# Prepare for merge, rename columns of newly loaded data, append 'src_'
from pyspark.sql import functions as F
# Capture column names of incoming dataset
columnNames = dataChanged.schema.names
# Rename all columns in dataChanged, prepend src_, and add additional columns
df_new = dataChanged.select([F.col(c).alias("src_"+c) for c in dataChanged.columns])
src_columnNames = df_new.schema.names
df_new2 = df_new.withColumn('src_current', lit(True)).withColumn('src_effectiveDate', lit(current_date)).withColumn('src_endDate', lit(date(9999, 12, 31)))
import hashlib
# Create dynamic columns
src_primaryKey = 'src_' + PrimaryKey
# FULL Merge, join on key column and also high date column to make only join to the latest records
df_merge = dataOriginal.join(df_new2, (df_new2[src_primaryKey] == dataOriginal[PrimaryKey]), how='fullouter')
# Derive new column to indicate the action
df_merge = df_merge.withColumn('action',
when(md5(concat_ws('+', *columnNames)) == md5(concat_ws('+', *src_columnNames)), 'NOACTION')
.when(df_merge.current == False, 'NOACTION')
.when(df_merge[src_primaryKey].isNull() & df_merge.current, 'DELETE')
.when(df_merge[src_primaryKey].isNull(), 'INSERT')
.otherwise('UPDATE')
)
# Generate target selections based on action codes
column_names = columnNames + ['current', 'effectiveDate', 'endDate']
src_column_names = src_columnNames + ['src_current', 'src_effectiveDate', 'src_endDate']
# Generate target selections based on action codes
column_names = columnNames + ['current', 'effectiveDate', 'endDate']
src_column_names = src_columnNames + ['src_current', 'src_effectiveDate', 'src_endDate']
# For records that needs no action
df_merge_p1 = df_merge.filter(df_merge.action == 'NOACTION').select(column_names)
# For records that needs insert only
df_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(src_column_names)
df_merge_p2_1 = df_merge_p2.select([F.col(c).alias(c.replace(c[0:4], "")) for c in df_merge_p2.columns])
# For records that needs to be deleted
df_merge_p3 = df_merge.filter(df_merge.action == 'DELETE').select(column_names).withColumn('current', lit(False)).withColumn('endDate', lit(current_date))
# For records that needs to be expired and then inserted
df_merge_p4_1 = df_merge.filter(df_merge.action == 'UPDATE').select(src_column_names)
df_merge_p4_2 = df_merge_p4_1.select([F.col(c).alias(c.replace(c[0:4], "")) for c in df_merge_p2.columns])
# Replace src_ alias in all columns
df_merge_p4_3 = df_merge.filter(df_merge.action == 'UPDATE').withColumn('endDate', date_sub(df_merge.src_effectiveDate, 1)).withColumn('current', lit(False)).select(column_names)
# Union all records together
df_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(df_merge_p3).unionAll(df_merge_p4_2).unionAll(df_merge_p4_3)
# At last, you can overwrite existing data using this new data frame
df_merge_final.write.format("delta").mode("overwrite").save(SourcePath + "/" + FlowName)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment