-
-
Save kmatt/5d20fb0dbaffe875ebb81a261c5c77d6 to your computer and use it in GitHub Desktop.
Compare Excel sheets with Pandas
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Compare two Excel sheets | |
Inspired by https://pbpython.com/excel-diff-pandas-update.html | |
For the documentation, download this file and type: | |
python compare.py --help | |
""" | |
import argparse | |
import pandas as pd | |
import numpy as np | |
def report_diff(x): | |
"""Function to use with groupby.apply to highlihgt value changes.""" | |
return x[0] if x[0] == x[1] or pd.isna(x).all() else f'{x[0]} ---> {x[1]}' | |
def strip(x): | |
"""Function to use with applymap to strip whitespaces in a dataframe.""" | |
return x.strip() if isinstance(x, str) else x | |
def diff_pd(old_df, new_df, idx_col): | |
"""Identify differences between two pandas DataFrames using a key column. | |
Key column is assumed to have only unique data | |
(like a database unique id index) | |
Args: | |
old_df (pd.DataFrame): first dataframe | |
new_df (pd.DataFrame): second dataframe | |
idx_col (str|list(str)): column name(s) of the index, | |
needs to be present in both DataFrames | |
""" | |
# setting the column name as index for fast operations | |
old_df = old_df.set_index(idx_col) | |
new_df = new_df.set_index(idx_col) | |
# get the added and removed rows | |
old_keys = old_df.index | |
new_keys = new_df.index | |
if isinstance(old_keys, pd.MultiIndex): | |
removed_keys = old_keys.difference(new_keys) | |
added_keys = new_keys.difference(old_keys) | |
else: | |
removed_keys = np.setdiff1d(old_keys, new_keys) | |
added_keys = np.setdiff1d(new_keys, old_keys) | |
out_data = { | |
'removed': old_df.loc[removed_keys], | |
'added': new_df.loc[added_keys] | |
} | |
# focusing on common data of both dataframes | |
common_keys = np.intersect1d(old_keys, new_keys, assume_unique=True) | |
common_columns = np.intersect1d( | |
old_df.columns, new_df.columns, assume_unique=True | |
) | |
new_common = new_df.loc[common_keys, common_columns].applymap(strip) | |
old_common = old_df.loc[common_keys, common_columns].applymap(strip) | |
# get the changed rows keys by dropping identical rows | |
# (indexes are ignored, so we'll reset them) | |
common_data = pd.concat( | |
[old_common.reset_index(), new_common.reset_index()], sort=True | |
) | |
changed_keys = common_data.drop_duplicates(keep=False)[idx_col] | |
if isinstance(changed_keys, pd.Series): | |
changed_keys = changed_keys.unique() | |
else: | |
changed_keys = changed_keys.drop_duplicates().set_index(idx_col).index | |
# combining the changed rows via multi level columns | |
df_all_changes = pd.concat( | |
[old_common.loc[changed_keys], new_common.loc[changed_keys]], | |
axis='columns', | |
keys=['old', 'new'] | |
).swaplevel(axis='columns') | |
# using report_diff to merge the changes in a single cell with "-->" | |
df_changed = df_all_changes.groupby(level=0, axis=1).apply( | |
lambda frame: frame.apply(report_diff, axis=1)) | |
out_data['changed'] = df_changed | |
return out_data | |
def compare_excel( | |
path1, path2, out_path, sheet_name, index_col_name, **kwargs | |
): | |
old_df = pd.read_excel(path1, sheet_name=sheet_name, **kwargs) | |
new_df = pd.read_excel(path2, sheet_name=sheet_name, **kwargs) | |
diff = diff_pd(old_df, new_df, index_col_name) | |
with pd.ExcelWriter(out_path) as writer: | |
for sname, data in diff.items(): | |
data.to_excel(writer, sheet_name=sname) | |
print(f"Differences saved in {out_path}") | |
def build_parser(): | |
cfg = argparse.ArgumentParser( | |
description="Compares two excel files and outputs the differences " | |
"in another excel file." | |
) | |
cfg.add_argument("path1", help="Fist excel file") | |
cfg.add_argument("path2", help="Second excel file") | |
cfg.add_argument("sheetname", help="Name of the sheet to compare.") | |
cfg.add_argument( | |
"key_column", | |
help="Name of the column(s) with unique row identifier. It has to be " | |
"the actual text of the first row, not the excel notation." | |
"Use multiple times to create a composite index.", | |
nargs="+", | |
) | |
cfg.add_argument("-o", "--output-path", default="compared.xlsx", | |
help="Path of the comparison results") | |
cfg.add_argument("--skiprows", help='number of rows to skip', type=int, | |
action='append', default=None) | |
return cfg | |
def main(): | |
cfg = build_parser() | |
opt = cfg.parse_args() | |
compare_excel(opt.path1, opt.path2, opt.output_path, opt.sheetname, | |
opt.key_column, skiprows=opt.skiprows) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Excel compare test suite.""" | |
import io | |
import pandas as pd | |
import compare | |
def test_parser(): | |
cfg = compare.build_parser() | |
opt = cfg.parse_args(["test1.xlsx", "test2.xlsx", "Sheet 1", "Col1", "Col2", "-o", "output.xlsx"]) | |
assert opt.path1 == "test1.xlsx" | |
assert opt.path2 == "test2.xlsx" | |
assert opt.output_path == "output.xlsx" | |
assert opt.sheetname == "Sheet 1" | |
assert opt.key_column == ["Col1", "Col2"] | |
assert opt.skiprows is None | |
def build_excel_stream(df, sheetname): | |
"""Create an excel workbook as a file-like object.""" | |
output = io.BytesIO() | |
with pd.ExcelWriter(output, engine="xlsxwriter") as writer: | |
df.to_excel(writer, sheet_name=sheetname, index=False) | |
return output | |
def sample_xlsx(df_1, df_2): | |
xlsx_1 = build_excel_stream(df_1, "Sheet1") | |
xlsx_2 = build_excel_stream(df_2, "Sheet1") | |
return xlsx_1, xlsx_2 | |
def sample_dfs(): | |
df_1 = pd.DataFrame({ | |
"ID": [123456, 654321, 543219, 432198, 765432], | |
"Name": ["Lemonade", "Cola", "Orange", "Fruit Punch", "Tobacco"], | |
"Flavour Description": ["Fuzzy", "Fuzzy", "Fuzzy", "Fuzzy", "Smoky"], | |
}) | |
df_2 = pd.DataFrame({ | |
"ID": [123456, 654321, 543219, 432198, 876543], | |
"Name": ["Lemonade", "Cola", "Orange", "Fruit Punch", "Soda"], | |
"Flavour Description": ["Fuzzy", "Bubbly", "Fuzzy", "Fuzzy", "Sugary"], | |
}) | |
return df_1, df_2 | |
def run_assertion(diff): | |
changed = diff["changed"] | |
assert len(changed) == 1 | |
assert changed.iloc[0]["Flavour Description"] == "Fuzzy ---> Bubbly" | |
added = diff["added"] | |
assert len(added) == 1 | |
assert added.iloc[0]["Flavour Description"] == "Sugary" | |
removed = diff["removed"] | |
assert len(removed) == 1 | |
assert removed.iloc[0]["Flavour Description"] == "Smoky" | |
print("OK.") | |
def test_single_index(): | |
df_1, df_2 = sample_dfs() | |
diff = compare.diff_pd(df_1, df_2, ["ID"]) | |
run_assertion(diff) | |
def test_single_index_excel(): | |
xlsx_1, xlsx_2 = sample_xlsx(*sample_dfs()) | |
diff_io = io.BytesIO() | |
compare.compare_excel(xlsx_1, xlsx_2, diff_io, "Sheet1", "ID") | |
diff = pd.read_excel(diff_io, sheet_name=None) | |
run_assertion(diff) | |
def sample_multiindex_dfs(): | |
df_1 = pd.DataFrame({ | |
"ID": [123456, 123456, 654321, 543219, 432198, 765432], | |
"Name": ["Lemonade", "Lemonade", "Cola", "Orange", "Fruit Punch", "Tobacco"], | |
"Flavour ID": [1, 2, None, None, None, None], | |
"Flavour Description": ["Fuzzy", "Fuzzy", "Fuzzy", "Fuzzy", "Fuzzy", "Smoky"], | |
}) | |
df_2 = pd.DataFrame({ | |
"ID": [123456, 123456, 654321, 543219, 432198, 876543], | |
"Name": ["Lemonade", "Lemonade", "Cola", "Orange", "Fruit Punch", "Soda"], | |
"Flavour ID": [1, 2, None, None, None, None], | |
"Flavour Description": ["Fuzzy", "Bubbly", "Fuzzy", "Fuzzy", "Fuzzy", "Sugary"], | |
}) | |
return df_1, df_2 | |
def test_multiindex(): | |
df_1, df_2 = sample_multiindex_dfs() | |
diff = compare.diff_pd(df_1, df_2, ["ID", "Flavour ID"]) | |
run_assertion(diff) | |
def test_multiindex_excel(): | |
xlsx_1, xlsx_2 = sample_xlsx(*sample_multiindex_dfs()) | |
diff_io = io.BytesIO() | |
compare.compare_excel(xlsx_1, xlsx_2, diff_io, "Sheet1", ["ID", "Flavour ID"]) | |
diff = pd.read_excel(diff_io, sheet_name=None) | |
run_assertion(diff) | |
if __name__ == '__main__': | |
test_multiindex() | |
test_multiindex_excel() | |
test_single_index() | |
test_single_index_excel() | |
test_parser() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment