Skip to content

Instantly share code, notes, and snippets.

@Menziess
Created March 29, 2020 20:12
Show Gist options
  • Save Menziess/97fbd3551dd61debbfcd62147736f8c1 to your computer and use it in GitHub Desktop.
Save Menziess/97fbd3551dd61debbfcd62147736f8c1 to your computer and use it in GitHub Desktop.
Change Data Capture
from pyspark.sql import DataFrame, Window
def change_data_capture(
df: DataFrame,
partition_col,
valid_from_col,
valid_to_col,
capture_columns=[]
):
"""Change Data Capture between DataFrames.
Given two DataFrames (unioned), captures the changes of the selected
`capture_columns`. When two records that follow eachother contain the
same values in selected `capture_columns`, they are not captured.
Parameters
----------
df : DataFrame
Data for multiple units of time
partition_col : Union[str, List[str]]
The key to window on
valid_from_col : str
The column that determines the start valid date
valid_to_col : str
The column to determine the last valid date
capture_columns : list
The columns for which changes in values are captured
Examples
--------
By default, the columns by which we partition are used for capturing
changes. When we receive totally new items, they are captured,
otherwise they are captured when the specified `capture_columns`
have changed compared toprevious records within a partition.
In our DataFrame, we have three exceptional cases:
- the price of beer is added again on the 5th, unchanged (exclude)
- a fish is added on the 5th (include)
- cola has been there since day 1 (include)
- a couple of bread price changes are added on different days
(include only if they differ compared to the previous record)
- eggs records don't all have a date in the far future, add the `to` column
if these records should be captured properly
>>> df = spark.createDataFrame([
... (0, 'bread', 1., '2020-01-01 00:00:00', '9999-12-31 00:00:00', 'a'),
... (1, 'cola', 1., '2020-01-01 00:00:00', '9999-12-31 00:00:00', 'e'),
... (2, 'beer', 1., '2020-01-01 00:00:00', '9999-12-31 00:00:00', 'c'),
... (3, 'fish', 10., '2020-01-05 00:00:00', '9999-12-31 00:00:00', 'd'),
... (2, 'beer', 1., '2020-01-05 00:00:00', '9999-12-31 00:00:00', 'g'),
... (0, 'bread', 1.5, '2020-01-05 01:00:00', '9999-12-31 00:00:00', 'b'),
... (0, 'bread', 1., '2020-01-05 02:00:00', '9999-12-31 00:00:00', 'f'),
... (0, 'bread', 1., '2020-01-05 03:00:00', '9999-12-31 00:00:00', 'j'),
... (4, 'eggs', 1., '2020-01-01 00:00:00', '2020-01-02 00:00:00', 'g'),
... (4, 'eggs', 1., '2020-01-02 00:00:00', '2020-01-03 00:00:00', 'h'),
... (4, 'eggs', 1., '2020-01-03 00:00:00', '9999-12-31 00:00:00', 'i')
... ], ['bk', 'item', 'price', 'from', 'to', 'noise'])
Ordering result by `noise` to keep test deterministic.
>>> change_data_capture(df, 'bk', 'from', 'to', capture_columns=[
... 'price', 'to'
... ]).sort('noise').show() # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
+---+-----+-----+-------------------+-------------------+-----+
| bk| item|price| from| to|noise|
+---+-----+-----+-------------------+-------------------+-----+
| 0|bread| 1.0|2020-01-01 00:00:00|2020-01-05 01:00:00| a|
| 0|bread| 1.5|2020-01-05 01:00:00|2020-01-05 02:00:00| b|
| 2| beer| 1.0|2020-01-01 00:00:00|9999-12-31 00:00:00| c|
| 3| fish| 10.0|2020-01-05 00:00:00|9999-12-31 00:00:00| d|
| 1| cola| 1.0|2020-01-01 00:00:00|9999-12-31 00:00:00| e|
| 0|bread| 1.0|2020-01-05 02:00:00|9999-12-31 00:00:00| f|
| 4| eggs| 1.0|2020-01-01 00:00:00|2020-01-02 00:00:00| g|
| 4| eggs| 1.0|2020-01-02 00:00:00|2020-01-03 00:00:00| h|
| 4| eggs| 1.0|2020-01-03 00:00:00|9999-12-31 00:00:00| i|
+---+-----+-----+-------------------+-------------------+-----+
"""
from functools import reduce
try:
_ = df.columns[0]
except IndexError:
raise IndexError('Can\'t handle empty DataFrames')
# Creating a window over a partition
partition_cols = as_list(partition_col)
# Duplicates are dropped depending on the columns you wish to capture
# With no columns specified, a random record is captured
dropduplicates = list(
set(partition_cols + capture_columns + [valid_from_col]))
w = Window.partitionBy(partition_cols).orderBy(f.col(valid_from_col))
def drop_trailing_equals(df, *cols):
"""Drop record if previous row has same values in columns."""
if not len(cols):
return df
for col in cols:
df = df.withColumn(f'_compare_{col}', f.lag(col).over(w))
filter_expression = reduce(Column.__or__, (
(f.col(f'_compare_{col}').isNull() |
(f.col(col) != f.col(f'_compare_{col}')))
for col in cols
))
return (
df
.where(filter_expression)
.drop(*[f'_compare_{col}' for col in cols])
)
return drop_trailing_equals(
df.dropDuplicates(dropduplicates),
*capture_columns
).withColumn(
valid_to_col,
f.coalesce(f.lead(valid_from_col).over(w), f.col(valid_to_col))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment