Skip to content

Instantly share code, notes, and snippets.

Created March 29, 2020 20:12
Show Gist options
  • Save Menziess/97fbd3551dd61debbfcd62147736f8c1 to your computer and use it in GitHub Desktop.
Save Menziess/97fbd3551dd61debbfcd62147736f8c1 to your computer and use it in GitHub Desktop.
Change Data Capture
from pyspark.sql import DataFrame, Window
def change_data_capture(
df: DataFrame,
"""Change Data Capture between DataFrames.
Given two DataFrames (unioned), captures the changes of the selected
`capture_columns`. When two records that follow eachother contain the
same values in selected `capture_columns`, they are not captured.
df : DataFrame
Data for multiple units of time
partition_col : Union[str, List[str]]
The key to window on
valid_from_col : str
The column that determines the start valid date
valid_to_col : str
The column to determine the last valid date
capture_columns : list
The columns for which changes in values are captured
By default, the columns by which we partition are used for capturing
changes. When we receive totally new items, they are captured,
otherwise they are captured when the specified `capture_columns`
have changed compared toprevious records within a partition.
In our DataFrame, we have three exceptional cases:
- the price of beer is added again on the 5th, unchanged (exclude)
- a fish is added on the 5th (include)
- cola has been there since day 1 (include)
- a couple of bread price changes are added on different days
(include only if they differ compared to the previous record)
- eggs records don't all have a date in the far future, add the `to` column
if these records should be captured properly
>>> df = spark.createDataFrame([
... (0, 'bread', 1., '2020-01-01 00:00:00', '9999-12-31 00:00:00', 'a'),
... (1, 'cola', 1., '2020-01-01 00:00:00', '9999-12-31 00:00:00', 'e'),
... (2, 'beer', 1., '2020-01-01 00:00:00', '9999-12-31 00:00:00', 'c'),
... (3, 'fish', 10., '2020-01-05 00:00:00', '9999-12-31 00:00:00', 'd'),
... (2, 'beer', 1., '2020-01-05 00:00:00', '9999-12-31 00:00:00', 'g'),
... (0, 'bread', 1.5, '2020-01-05 01:00:00', '9999-12-31 00:00:00', 'b'),
... (0, 'bread', 1., '2020-01-05 02:00:00', '9999-12-31 00:00:00', 'f'),
... (0, 'bread', 1., '2020-01-05 03:00:00', '9999-12-31 00:00:00', 'j'),
... (4, 'eggs', 1., '2020-01-01 00:00:00', '2020-01-02 00:00:00', 'g'),
... (4, 'eggs', 1., '2020-01-02 00:00:00', '2020-01-03 00:00:00', 'h'),
... (4, 'eggs', 1., '2020-01-03 00:00:00', '9999-12-31 00:00:00', 'i')
... ], ['bk', 'item', 'price', 'from', 'to', 'noise'])
Ordering result by `noise` to keep test deterministic.
>>> change_data_capture(df, 'bk', 'from', 'to', capture_columns=[
... 'price', 'to'
... ]).sort('noise').show() # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
| bk| item|price| from| to|noise|
| 0|bread| 1.0|2020-01-01 00:00:00|2020-01-05 01:00:00| a|
| 0|bread| 1.5|2020-01-05 01:00:00|2020-01-05 02:00:00| b|
| 2| beer| 1.0|2020-01-01 00:00:00|9999-12-31 00:00:00| c|
| 3| fish| 10.0|2020-01-05 00:00:00|9999-12-31 00:00:00| d|
| 1| cola| 1.0|2020-01-01 00:00:00|9999-12-31 00:00:00| e|
| 0|bread| 1.0|2020-01-05 02:00:00|9999-12-31 00:00:00| f|
| 4| eggs| 1.0|2020-01-01 00:00:00|2020-01-02 00:00:00| g|
| 4| eggs| 1.0|2020-01-02 00:00:00|2020-01-03 00:00:00| h|
| 4| eggs| 1.0|2020-01-03 00:00:00|9999-12-31 00:00:00| i|
from functools import reduce
_ = df.columns[0]
except IndexError:
raise IndexError('Can\'t handle empty DataFrames')
# Creating a window over a partition
partition_cols = as_list(partition_col)
# Duplicates are dropped depending on the columns you wish to capture
# With no columns specified, a random record is captured
dropduplicates = list(
set(partition_cols + capture_columns + [valid_from_col]))
w = Window.partitionBy(partition_cols).orderBy(f.col(valid_from_col))
def drop_trailing_equals(df, *cols):
"""Drop record if previous row has same values in columns."""
if not len(cols):
return df
for col in cols:
df = df.withColumn(f'_compare_{col}', f.lag(col).over(w))
filter_expression = reduce(Column.__or__, (
(f.col(f'_compare_{col}').isNull() |
(f.col(col) != f.col(f'_compare_{col}')))
for col in cols
return (
.drop(*[f'_compare_{col}' for col in cols])
return drop_trailing_equals(
f.coalesce(f.lead(valid_from_col).over(w), f.col(valid_to_col))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment