Skip to content

Instantly share code, notes, and snippets.

@shtrom
Last active February 1, 2024 22:00
Show Gist options
  • Save shtrom/1ec5c12c9cf669ac785533adc1f46dff to your computer and use it in GitHub Desktop.
Save shtrom/1ec5c12c9cf669ac785533adc1f46dff to your computer and use it in GitHub Desktop.
Semi-automated tag deduplicator for Wallabag
#!/bin/sh
python3 -m venv .venv/wallabag_merge_tags
. .venv/wallabag_merge_tags/bin/activate
pip install -r requirements.txt
cat << EOF >&2
*** Now run
. .venv/wallabag_merge_tags/bin/activate
EOF
fuzzywuzzy==0.18.0
pyspellchecker==0.5.6
python-Levenshtein==0.12.1
#!/usr/bin/env python3
#
import csv
import logging
import sys
from fuzzywuzzy import process
from spellchecker import SpellChecker
def main(infilename: str, whitelist: list = None) -> list:
logging.basicConfig(level=logging.INFO)
data = read_csv(infilename)
wl = []
if whitelist:
logging.info(f"Using whitelist: {whitelist}")
with open(whitelist) as fd:
wl = {k: True for k in fd.read().splitlines()}
logging.info(f"Whitelist has {len(wl)} entries")
actions = merge_data(data, wl)
logging.info(f"Merged rows: {len(actions)}")
return make_sql(actions)
def read_csv(filename: str) -> list:
data = []
with open(filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
data.append(row)
return data
def merge_data(data: list, whitelist: list) -> dict:
d = SpellChecker()
rows_by_label = {}
actions = {
}
choices = []
try:
for row in data:
row_id = row['id']
label = row['label']
suggestion = process.extractOne(label, choices)
if label in whitelist or suggestion is None or suggestion[1] < 95:
logging.debug(f'Keeping {label} ...')
rows_by_label[label] = row_id
choices.append(label)
else:
swap = False
suggested_label = suggestion[0]
suggested_row_id = rows_by_label[suggested_label]
if label.startswith('wb:') \
and not suggested_label.startswith('wb:'):
logging.info(
f"Skipping merge of {label} into {suggested_label} ...")
continue
unknown = d.unknown(suggested_label.split())
if suggested_label.strip() != suggested_label:
logging.info(
f"Suggested label '{suggested_label}' has whitespace issues, swapping '{label}' in ...")
swap = True
elif suggested_label[-1] in ['.', ',']:
logging.info(
f"Suggested label '{suggested_label}' has trailing punctuation issues, swapping '{label}' in ...")
swap = True
elif suggested_label.find('(') < 0 and label.find('(') > -1:
logging.info(
f"New label '{label}' has parens, swapping '{suggested_label}' out ...")
swap = True
elif suggested_label.find(' ') < 0 and label.find(' ') > -1:
logging.info(
f"New label '{label}' has spaces, swapping '{suggested_label}' out ...")
swap = True
elif len(unknown) and any([x in y
for x in label.split()
for y in [d.candidates(u)
for u in unknown]
]):
logging.info(
f"New label '{label}' is a corrected spelling of '{suggested_label}', swapping out ...")
swap = True
if swap:
suggested_label, suggested_row_id, \
label, row_id = \
label, row_id, \
suggested_label, suggested_row_id
if label in whitelist:
logging.info(
f"Skipping merge of {label} into {suggested_label} (whitelisted)...")
continue
if swap:
choices.remove(label)
choices.append(suggested_label)
rows_by_label[suggested_label] = suggested_row_id
logging.info(
f"Merging '{label}' into '{suggested_label}' (ratio: {suggestion[1]}) ...")
actions[label] = {}
actions[label]['update'] = (
row_id, suggested_row_id, suggested_label)
actions[label]['delete'] = row_id
except KeyboardInterrupt:
pass
return actions
def make_sql(actions: dict) -> list:
sql = []
for label in actions:
update = actions[label]['update']
delete = actions[label]['delete']
sql.append(
f"UPDATE IGNORE wallabag_entry_tag SET tag_id={update[1]} WHERE tag_id={update[0]}; -- '{label}' to '{update[2]}'")
sql.append(
f"DELETE FROM wallabag_tag WHERE id={delete}; -- '{label}'")
return sql
if __name__ == '__main__':
whitelist = None
if (l := len(sys.argv)) < 2:
print('usage:\n'
+ '\n mycli -h DBHOST --csv -e \'SELECT * FROM wallabag_tag\' wallabag > wallabag_tag.csv'
+ '\n ./wallabag_merge_tags.py wallabag_tag.csv whitelist > merge_tags.sql'
)
sys.exit(1)
elif l > 2:
whitelist = sys.argv[2]
for sql in main(sys.argv[1], whitelist):
print(sql)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment