Created
March 29, 2020 20:12
-
-
Save Menziess/97fbd3551dd61debbfcd62147736f8c1 to your computer and use it in GitHub Desktop.
Change Data Capture
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import DataFrame, Window | |
def change_data_capture( | |
df: DataFrame, | |
partition_col, | |
valid_from_col, | |
valid_to_col, | |
capture_columns=[] | |
): | |
"""Change Data Capture between DataFrames. | |
Given two DataFrames (unioned), captures the changes of the selected | |
`capture_columns`. When two records that follow eachother contain the | |
same values in selected `capture_columns`, they are not captured. | |
Parameters | |
---------- | |
df : DataFrame | |
Data for multiple units of time | |
partition_col : Union[str, List[str]] | |
The key to window on | |
valid_from_col : str | |
The column that determines the start valid date | |
valid_to_col : str | |
The column to determine the last valid date | |
capture_columns : list | |
The columns for which changes in values are captured | |
Examples | |
-------- | |
By default, the columns by which we partition are used for capturing | |
changes. When we receive totally new items, they are captured, | |
otherwise they are captured when the specified `capture_columns` | |
have changed compared toprevious records within a partition. | |
In our DataFrame, we have three exceptional cases: | |
- the price of beer is added again on the 5th, unchanged (exclude) | |
- a fish is added on the 5th (include) | |
- cola has been there since day 1 (include) | |
- a couple of bread price changes are added on different days | |
(include only if they differ compared to the previous record) | |
- eggs records don't all have a date in the far future, add the `to` column | |
if these records should be captured properly | |
>>> df = spark.createDataFrame([ | |
... (0, 'bread', 1., '2020-01-01 00:00:00', '9999-12-31 00:00:00', 'a'), | |
... (1, 'cola', 1., '2020-01-01 00:00:00', '9999-12-31 00:00:00', 'e'), | |
... (2, 'beer', 1., '2020-01-01 00:00:00', '9999-12-31 00:00:00', 'c'), | |
... (3, 'fish', 10., '2020-01-05 00:00:00', '9999-12-31 00:00:00', 'd'), | |
... (2, 'beer', 1., '2020-01-05 00:00:00', '9999-12-31 00:00:00', 'g'), | |
... (0, 'bread', 1.5, '2020-01-05 01:00:00', '9999-12-31 00:00:00', 'b'), | |
... (0, 'bread', 1., '2020-01-05 02:00:00', '9999-12-31 00:00:00', 'f'), | |
... (0, 'bread', 1., '2020-01-05 03:00:00', '9999-12-31 00:00:00', 'j'), | |
... (4, 'eggs', 1., '2020-01-01 00:00:00', '2020-01-02 00:00:00', 'g'), | |
... (4, 'eggs', 1., '2020-01-02 00:00:00', '2020-01-03 00:00:00', 'h'), | |
... (4, 'eggs', 1., '2020-01-03 00:00:00', '9999-12-31 00:00:00', 'i') | |
... ], ['bk', 'item', 'price', 'from', 'to', 'noise']) | |
Ordering result by `noise` to keep test deterministic. | |
>>> change_data_capture(df, 'bk', 'from', 'to', capture_columns=[ | |
... 'price', 'to' | |
... ]).sort('noise').show() # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS | |
+---+-----+-----+-------------------+-------------------+-----+ | |
| bk| item|price| from| to|noise| | |
+---+-----+-----+-------------------+-------------------+-----+ | |
| 0|bread| 1.0|2020-01-01 00:00:00|2020-01-05 01:00:00| a| | |
| 0|bread| 1.5|2020-01-05 01:00:00|2020-01-05 02:00:00| b| | |
| 2| beer| 1.0|2020-01-01 00:00:00|9999-12-31 00:00:00| c| | |
| 3| fish| 10.0|2020-01-05 00:00:00|9999-12-31 00:00:00| d| | |
| 1| cola| 1.0|2020-01-01 00:00:00|9999-12-31 00:00:00| e| | |
| 0|bread| 1.0|2020-01-05 02:00:00|9999-12-31 00:00:00| f| | |
| 4| eggs| 1.0|2020-01-01 00:00:00|2020-01-02 00:00:00| g| | |
| 4| eggs| 1.0|2020-01-02 00:00:00|2020-01-03 00:00:00| h| | |
| 4| eggs| 1.0|2020-01-03 00:00:00|9999-12-31 00:00:00| i| | |
+---+-----+-----+-------------------+-------------------+-----+ | |
""" | |
from functools import reduce | |
try: | |
_ = df.columns[0] | |
except IndexError: | |
raise IndexError('Can\'t handle empty DataFrames') | |
# Creating a window over a partition | |
partition_cols = as_list(partition_col) | |
# Duplicates are dropped depending on the columns you wish to capture | |
# With no columns specified, a random record is captured | |
dropduplicates = list( | |
set(partition_cols + capture_columns + [valid_from_col])) | |
w = Window.partitionBy(partition_cols).orderBy(f.col(valid_from_col)) | |
def drop_trailing_equals(df, *cols): | |
"""Drop record if previous row has same values in columns.""" | |
if not len(cols): | |
return df | |
for col in cols: | |
df = df.withColumn(f'_compare_{col}', f.lag(col).over(w)) | |
filter_expression = reduce(Column.__or__, ( | |
(f.col(f'_compare_{col}').isNull() | | |
(f.col(col) != f.col(f'_compare_{col}'))) | |
for col in cols | |
)) | |
return ( | |
df | |
.where(filter_expression) | |
.drop(*[f'_compare_{col}' for col in cols]) | |
) | |
return drop_trailing_equals( | |
df.dropDuplicates(dropduplicates), | |
*capture_columns | |
).withColumn( | |
valid_to_col, | |
f.coalesce(f.lead(valid_from_col).over(w), f.col(valid_to_col)) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment