|
#!/usr/bin/env python3 |
|
# |
|
|
|
import csv |
|
import logging |
|
import sys |
|
|
|
from fuzzywuzzy import process |
|
from spellchecker import SpellChecker |
|
|
|
|
|
def main(infilename: str, whitelist: list = None) -> list: |
|
logging.basicConfig(level=logging.INFO) |
|
data = read_csv(infilename) |
|
wl = [] |
|
if whitelist: |
|
logging.info(f"Using whitelist: {whitelist}") |
|
with open(whitelist) as fd: |
|
wl = {k: True for k in fd.read().splitlines()} |
|
logging.info(f"Whitelist has {len(wl)} entries") |
|
actions = merge_data(data, wl) |
|
logging.info(f"Merged rows: {len(actions)}") |
|
return make_sql(actions) |
|
|
|
|
|
def read_csv(filename: str) -> list: |
|
data = [] |
|
with open(filename) as csvfile: |
|
reader = csv.DictReader(csvfile) |
|
for row in reader: |
|
data.append(row) |
|
return data |
|
|
|
|
|
def merge_data(data: list, whitelist: list) -> dict: |
|
d = SpellChecker() |
|
rows_by_label = {} |
|
actions = { |
|
} |
|
choices = [] |
|
try: |
|
for row in data: |
|
row_id = row['id'] |
|
label = row['label'] |
|
suggestion = process.extractOne(label, choices) |
|
if label in whitelist or suggestion is None or suggestion[1] < 95: |
|
logging.debug(f'Keeping {label} ...') |
|
rows_by_label[label] = row_id |
|
choices.append(label) |
|
else: |
|
swap = False |
|
suggested_label = suggestion[0] |
|
suggested_row_id = rows_by_label[suggested_label] |
|
|
|
if label.startswith('wb:') \ |
|
and not suggested_label.startswith('wb:'): |
|
logging.info( |
|
f"Skipping merge of {label} into {suggested_label} ...") |
|
continue |
|
|
|
unknown = d.unknown(suggested_label.split()) |
|
if suggested_label.strip() != suggested_label: |
|
logging.info( |
|
f"Suggested label '{suggested_label}' has whitespace issues, swapping '{label}' in ...") |
|
swap = True |
|
elif suggested_label[-1] in ['.', ',']: |
|
logging.info( |
|
f"Suggested label '{suggested_label}' has trailing punctuation issues, swapping '{label}' in ...") |
|
swap = True |
|
elif suggested_label.find('(') < 0 and label.find('(') > -1: |
|
logging.info( |
|
f"New label '{label}' has parens, swapping '{suggested_label}' out ...") |
|
swap = True |
|
elif suggested_label.find(' ') < 0 and label.find(' ') > -1: |
|
logging.info( |
|
f"New label '{label}' has spaces, swapping '{suggested_label}' out ...") |
|
swap = True |
|
elif len(unknown) and any([x in y |
|
for x in label.split() |
|
for y in [d.candidates(u) |
|
for u in unknown] |
|
]): |
|
logging.info( |
|
f"New label '{label}' is a corrected spelling of '{suggested_label}', swapping out ...") |
|
swap = True |
|
|
|
if swap: |
|
suggested_label, suggested_row_id, \ |
|
label, row_id = \ |
|
label, row_id, \ |
|
suggested_label, suggested_row_id |
|
|
|
if label in whitelist: |
|
logging.info( |
|
f"Skipping merge of {label} into {suggested_label} (whitelisted)...") |
|
continue |
|
|
|
if swap: |
|
choices.remove(label) |
|
choices.append(suggested_label) |
|
rows_by_label[suggested_label] = suggested_row_id |
|
|
|
logging.info( |
|
f"Merging '{label}' into '{suggested_label}' (ratio: {suggestion[1]}) ...") |
|
actions[label] = {} |
|
actions[label]['update'] = ( |
|
row_id, suggested_row_id, suggested_label) |
|
actions[label]['delete'] = row_id |
|
except KeyboardInterrupt: |
|
pass |
|
|
|
return actions |
|
|
|
|
|
def make_sql(actions: dict) -> list: |
|
sql = [] |
|
for label in actions: |
|
update = actions[label]['update'] |
|
delete = actions[label]['delete'] |
|
sql.append( |
|
f"UPDATE IGNORE wallabag_entry_tag SET tag_id={update[1]} WHERE tag_id={update[0]}; -- '{label}' to '{update[2]}'") |
|
sql.append( |
|
f"DELETE FROM wallabag_tag WHERE id={delete}; -- '{label}'") |
|
|
|
return sql |
|
|
|
|
|
if __name__ == '__main__': |
|
whitelist = None |
|
if (l := len(sys.argv)) < 2: |
|
print('usage:\n' |
|
+ '\n mycli -h DBHOST --csv -e \'SELECT * FROM wallabag_tag\' wallabag > wallabag_tag.csv' |
|
+ '\n ./wallabag_merge_tags.py wallabag_tag.csv whitelist > merge_tags.sql' |
|
) |
|
sys.exit(1) |
|
elif l > 2: |
|
whitelist = sys.argv[2] |
|
for sql in main(sys.argv[1], whitelist): |
|
print(sql) |