Skip to content

Instantly share code, notes, and snippets.

@jelmervdl
Created June 9, 2022 21:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jelmervdl/4ff421572c8ac2f53946ad817b83e77b to your computer and use it in GitHub Desktop.
Save jelmervdl/4ff421572c8ac2f53946ad817b83e77b to your computer and use it in GitHub Desktop.
Near duplicate deduplication
#!/usr/bin/env python3
import sys
import gzip
import os
from collections import defaultdict
from xxhash import xxh64
from unicodedata import category as cat
from unidecode import unidecode
from functools import reduce
from tqdm.autonotebook import tqdm
from io import TextIOWrapper, BufferedReader
class ProgressWrapper:
"""Wraps around a file-like object and shows a progress bar as to how much
of it has been read."""
def __init__(self, fh):
self.fh = fh
self.tqdm = tqdm(
desc=fh.name,
total=os.fstat(fh.fileno()).st_size,
initial=fh.seekable() and fh.tell(),
file=sys.stderr,
unit='b',
unit_scale=True)
def __getattr__(self, attr: str):
return getattr(self.fh, attr)
def read(self, size: int = -1):
data = self.fh.read(size)
self.tqdm.update(len(data))
return data
def read1(self, size: int = -1):
data = self.fh.read1(size)
self.tqdm.update(len(data))
return data
def close(self) -> None:
self.tqdm.close()
self.fh.close()
class Entry:
__slots__ = 'line', 'count'
def __init__(self, line, count):
self.line = line
self.count = count
def magic_open(name, mode):
fh = open(name, 'rb')
fh = ProgressWrapper(fh)
if name.endswith('.gz'):
fh = gzip.open(fh, 'rb')
if 't' in mode:
fh = TextIOWrapper(fh)
else:
fh = BufferedReader(fh)
return fh
# The whole aggressive deduping bit is from https://github.com/bitextor/bifixer/blob/b4705e83a0f8aa584aa95dcf946e1c78dcadbb44/bifixer/bifixer.py#L36
tbl = [chr(i) for i in range(sys.maxunicode) if not cat(chr(i)).startswith('L')]
remove_non_alpha = str.maketrans('', '', ''.join(tbl))
counts: dict[int,dict[int,Entry]] = defaultdict(lambda: {})
total = 0
with magic_open(sys.argv[1], 'rt') as fh:
for n, line in enumerate(fh):
# Hash of the actual line, to count exact duplicates
line_hash = hash(line)
# Hash of a normalized version of the line, to detect near duplicates
# normalized = unidecode(line.lower().translate(remove_non_alpha))
normalized = line.lower().translate(remove_non_alpha)
normalized_hash = xxh64(normalized).intdigest()
# Count how often this exact duplicate occurs in the near-duplicate entry
entry = counts[normalized_hash]
if line_hash in entry:
entry[line_hash].count += 1
else:
entry[line_hash] = Entry(n, 1)
unique: list[int]
# Pick the most common variant for each near-duplicate
unique = [max(count.values(), key=lambda entry: entry.count).line for count in tqdm(counts.values())]
# Inverse unique: show all the variants of only lines that occur more than once
# unique = reduce(lambda acc, count: acc + [entry.line for entry in count.values()] if len(count) > 1 else acc, counts.values(), [])
del counts
# Print ratio to stderr
print("{} / {} = {:0.4f}%".format(len(unique), n + 1, len(unique) / (n + 1) * 100), file=sys.stderr)
# Iterate over input again, but only pass our selection to stdout
unique.sort()
m = 0
with magic_open(sys.argv[1], 'rb') as fh:
for n, line in enumerate(fh):
if unique[m] == n:
sys.stdout.buffer.write(line)
m += 1
if m == len(unique):
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment