Created
December 5, 2023 17:57
-
-
Save RyanJulyan/8a4a0ef626676b739cef813f9ae4ef37 to your computer and use it in GitHub Desktop.
This script reconciles two pandas DataFrames based on specified criteria. It efficiently compares DataFrame rows based on key columns, handling both numeric and textual differences with specified tolerances. It includes a calculating the Levenshtein distance comparison between strings.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import lru_cache | |
from typing import Any, List | |
import pandas as pd | |
def lev_dist(a: str, b: str) -> float: | |
""" | |
Calculate the Levenshtein distance between two input strings. | |
Args: | |
a (str): The first string to compare. | |
b (str): The second string to compare. | |
Returns: | |
float: The distance between strings a and b. | |
Example: | |
a = 'stamp' | |
b = 'stomp' | |
lev_dist(a, b) # Returns: 1.0 | |
""" | |
@lru_cache(None) # for memorization | |
def min_dist(s1, s2): | |
if s1 == len(a) or s2 == len(b): | |
return len(a) - s1 + len(b) - s2 | |
# no change required | |
if a[s1] == b[s2]: | |
return min_dist(s1 + 1, s2 + 1) | |
return 1 + min( | |
min_dist(s1, s2 + 1), # insert character | |
min_dist(s1 + 1, s2), # delete character | |
min_dist(s1 + 1, s2 + 1), # replace character | |
) | |
return min_dist(0, 0) | |
def generic_reconcile(df1: pd.DataFrame, | |
df2: pd.DataFrame, | |
key_columns: List[str], | |
comparison_columns: List[str] = None, | |
num_tolerance: float = 0, | |
lev_dist_tolerance: int = 0) -> pd.DataFrame: | |
""" | |
Reconciles two dataframes based on specified key columns. | |
Args: | |
df1 (pd.DataFrame): First DataFrame. | |
df2 (pd.DataFrame): Second DataFrame. | |
key_columns (List[str]): List of columns to match on. | |
comparison_columns (List[str]): Optional. List of columns to compare for differences. | |
num_tolerance (float): Optional. Tolerance for numeric comparisons. | |
lev_dist_tolerance (int): Optional. Tolerance for text comparisons. | |
Returns: | |
pd.DataFrame: DataFrame with matched and unmatched records, and a status column. | |
""" | |
# Merge dataframes | |
merged_df = pd.merge(df1, | |
df2, | |
on=key_columns, | |
how='outer', | |
indicator=True, | |
suffixes=('_1', '_2')) | |
# Define comparison columns if not provided | |
if comparison_columns is None: | |
comparison_columns = [ | |
col for col in df1.columns | |
if col not in key_columns and pd.api.types.is_numeric_dtype(df1[col]) | |
] | |
# Apply comparison logic | |
def compare_row(row): | |
if row['_merge'] != 'both': | |
return 'Unmatched in DataFrame 1' if row[ | |
'_merge'] == 'left_only' else 'Unmatched in DataFrame 2' | |
for col in comparison_columns: | |
val1, val2 = row[f'{col}_1'], row[f'{col}_2'] | |
if pd.isna(val1) or pd.isna(val2): | |
continue # Skip comparison if either value is NaN | |
if (type(val1) == int or type(val1) == float) and (type(val2) == int or | |
type(val2) == float): | |
if val1 == val2: | |
return 'Matched' | |
elif abs(val1 - val2) <= num_tolerance: | |
return 'Matched within tolerance' | |
else: | |
if val1 == val2: | |
return 'Matched' | |
elif lev_dist(val1, val2) <= lev_dist_tolerance: | |
return 'Matched within tolerance' | |
else: | |
return 'Unmatched' | |
return 'Unmatched' | |
# Apply the comparison logic to each row | |
merged_df['Status'] = merged_df.apply(compare_row, axis=1) | |
return merged_df.drop(columns=['_merge']) | |
if __name__ == '__main__': | |
# Example usage | |
df1 = pd.DataFrame({ | |
'id': [ | |
1, | |
2, | |
3, | |
5, | |
6, | |
7, | |
8, | |
], | |
'value': [ | |
100.0, | |
150.5, | |
200.1, | |
"test", | |
"test2", | |
"test", | |
100.0, | |
] | |
}) | |
df2 = pd.DataFrame({ | |
'id': [ | |
1, | |
2, | |
4, | |
5, | |
6, | |
7, | |
8, | |
], | |
'value': [ | |
100.0, | |
150.8, | |
250.0, | |
"test", | |
"test", | |
"tested", | |
102.0, | |
] | |
}) | |
result_df = generic_reconcile(df1, | |
df2, ['id'], | |
comparison_columns=['value'], | |
num_tolerance=0.4, | |
lev_dist_tolerance=1) | |
print(result_df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment