Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A program that compares CSV data between the “before” and the “after” after. Both CSV must have a unique identifier specified in a `key` column. This will output the new rows, updated rows and the deleted rows' keys.
#!/usr/bin/env python3.7
"""
Shows changes in product information from CSV files.
"""
from typing import Dict, List, Tuple, Set, Any, Hashable
import argparse
import csv
import logging
import logging.config
import sys
log_config_dict = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"standard": {
"format": "%(asctime)s [%(levelname)7s] %(name)s: %(message)s"
},
},
"handlers": {
"cli": {
"level": "INFO",
"formatter": "standard",
"class": "logging.StreamHandler",
},
"file": {
"level": "INFO",
"filename": "status-check.log",
"class": "logging.FileHandler",
"formatter": "standard",
},
},
"loggers": {
"": {"handlers": ["file", "cli"], "level": "INFO", "propagate": True}
},
}
logging.config.dictConfig(log_config_dict)
logger = logging.getLogger(__name__)
class ProductInformation:
def __init__(self, data: Dict[Hashable, Dict], fields: List):
self._data = data
self._fields = fields
@classmethod
def from_csv(cls, csv_path: str, key: str):
data = {}
fields = []
with open(csv_path, "r") as file:
reader = csv.DictReader(file)
fields = reader.fieldnames
for row_index, row in enumerate(reader, 1):
if key in row:
data[row[key]] = row
else:
logger.warning(
"Key '{}' is not present in {}:{}. Discarded. ".format(
key, csv_path, row_index
)
)
return cls(data, fields)
@property
def data(self) -> List[Dict]:
return self._data.values()
@property
def fields(self) -> List[str]:
return self._fields
def to_csv(self, fp, fields: List[str] = None):
fields = fields or self.fields
# If fields is subset of self.fields
# then extra fields should be ignored
writer = csv.DictWriter(fp, extrasaction="ignore", fieldnames=fields)
writer.writeheader()
for row in self.data:
writer.writerow(row)
def get(self, key: str, default: Any = None) -> Any:
return self._data.get(key, default)
def keys(self) -> List[str]:
return self._data.keys()
def __len__(self):
return len(self.keys())
def __sub__(self, other):
data = {}
for key in set(self.keys()) - set(other.keys()):
data[key] = self.get(key)
return self.__class__(data, self.fields)
class ChangeTracker:
def __init__(self, before: ProductInformation, after: ProductInformation):
self.before = before
self.after = after
def creates(self) -> ProductInformation:
return self.after - self.before
def updates(self) -> ProductInformation:
common_product_keys = set(self.before.keys()) & set(self.after.keys())
data = {}
for product_key in common_product_keys:
if self.before.get(product_key) != self.after.get(product_key):
data[product_key] = self.after.get(product_key)
return ProductInformation(data, self.after.fields)
def deletes(self) -> ProductInformation:
return self.before - self.after
def calculate_create_update_delete(
before_csv: str, after_csv: str, key: str = "id"
) -> Tuple[ProductInformation, ProductInformation, ProductInformation]:
before_product_info = ProductInformation.from_csv(before_csv, key=key)
after_product_info = ProductInformation.from_csv(after_csv, key=key)
change_tracker = ChangeTracker(before_product_info, after_product_info)
return (
change_tracker.creates(),
change_tracker.updates(),
change_tracker.deletes(),
)
def main(
before_csv: str, after_csv: str
) -> Tuple[List[Dict], List[Dict], Set[str]]:
creates, updates, deletes = calculate_create_update_delete(
before_csv, after_csv, "id"
)
return list(creates.data), list(updates.data), set(deletes.keys())
def command_line():
parser = argparse.ArgumentParser(__doc__)
parser.add_argument("before", help="Old production information CSV")
parser.add_argument("after", help="New production information CSV")
parser.add_argument(
"-k",
"--key",
help="CSV column name to identify product key",
default="id",
)
args = parser.parse_args()
creates, updates, deletes = calculate_create_update_delete(
args.before, args.after, args.key
)
if creates:
logger.info("CREATES")
creates.to_csv(sys.stdout)
if updates:
logger.info("UPDATES")
updates.to_csv(sys.stdout)
if deletes:
logger.info("DELETES")
deletes.to_csv(sys.stdout, fields=["id"])
if __name__ == "__main__":
command_line()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment