Skip to content

Instantly share code, notes, and snippets.

@Whats-A-MattR
Created March 8, 2023 04:16
Show Gist options
  • Save Whats-A-MattR/a47cc99e7fd48fdf9f7d33c63716cb4c to your computer and use it in GitHub Desktop.
Save Whats-A-MattR/a47cc99e7fd48fdf9f7d33c63716cb4c to your computer and use it in GitHub Desktop.
DataBricks DeltaTable Merge() Example, with unique key discovery
# Assumptions;
# you have are working with CSVs that include useful headers
# you are using the 'inferSchema' option when reading those CSVs to DataFrames
# you are writing/updating Delta Tables with those DataFrames
import pandas as pd
import pyspark.sql.functions as F
# change other date formats to strings, useful if you get out of bounds errors when casting timestamp[tz, etc.] to timestamp[n]
def date_to_string(df: DataFrame) -> DataFrame:
return df.select(*[
F.col(c).cast("string").alias(c) if t == "timestamp" else F.col(c).alias(c)
for c, t in df.dtypes
])
def locate_unique_keys(df):
# useful but not necessarily always required, parse datetime to a string
df = date_to_string(df)
# in this example, we have passed in a Spark DataFrame, we're converting it to pandas for the utilities we will be using
pdf = df.toPandas()
primary_keys = []
for col in pdf.columns:
if pdf[col].is_unique:
primary_keys.append(col)
break
if not primary_keys:
for i in range(len(df.columns)):
for j in range(i+1, len(df.columns)):
combined = pdf[pdf.columns[i]].map(str) + pdf[pdf.columns[j]].map(str)
if combined.is_unique:
primary_keys.append(pdf.columns[i])
primary_keys.append(pdf.columns[j])
break
if primary_keys: break
return primary_keys
# usage in a merge function
def mergeDeltaTable(tname, df):
ukeys = locate_unique_keys(df)
existing_data = DeltaTable.forPath(spark, '<yourpath>') # load your delta table here
#lets make our merge conditions
conditions = [f"data.{k} = updates.{k}" for k in ukeys]
# this makes a string that might look something like this
# "data.col1 = updates.col1" to handle the event where there is more than one column required to match rows uniquely, we do this
# join will only run on a list with more than one entry, so we dont need to handle this based on length
joined_conditions = ' AND '.join(conditions)
# now we can run our merge, but what if there are no unique results returned? I've opted to just overwrite the table - this works with my requirements, but yours may vary
if joined_conditions:
existing_data.alias("data") \
.merge(
new_data.alias("updates"),
joined_conditions
) \
.whenMatchedupdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
new_data.write.mode("overwrite").format("delta").save("<your output directory>")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment