Skip to content

Instantly share code, notes, and snippets.

@cmpute
Created October 26, 2023 09:38
Show Gist options
  • Save cmpute/300b10ebaaf84b825e0536030524c493 to your computer and use it in GitHub Desktop.
Save cmpute/300b10ebaaf84b825e0536030524c493 to your computer and use it in GitHub Desktop.
File deduplication
import sys, os, binascii
from imohash import hashfile
from pathlib import Path
from tqdm import tqdm
from collections import defaultdict
from time import time
from hashlib import md5
USE_HASH = True
PROFILE = False
# for demonstrating hash performance
def hashfullfile(file: Path):
return md5(file.read_bytes()).digest
def dedup_inplace(dir: Path):
database = []
dir_count = 0
name_map = defaultdict(list)
fhash_map = defaultdict(list)
# collect file info
try:
t_start = time()
t_prev = time()
file_count = 0
terminated = False
for path, dirs, files in tqdm(os.walk(dir), leave=False):
if terminated:
break
dir_count += 1
for file in files:
file = dir / path / file
file_idx = len(database)
database.append(file)
if USE_HASH:
try:
fhash_map[hashfile(file)].append(file_idx)
except KeyboardInterrupt:
tqdm.write("Terminated.")
terminated = True
break
except:
tqdm.write("Error occured in reading " + str(file))
else:
name_map[file.name].append(file_idx)
if PROFILE:
file_count += 1
t_proc = time() - t_prev
t_proc_total = time() - t_start
tqdm.write("Processed in %f secs (%f avg)" % (t_proc, t_proc_total / file_count))
t_prev = time()
except:
print("Execution terminated!")
finally:
pass
# report duplicates
print("Total %d directories" % dir_count)
map_to_use = fhash_map if USE_HASH else name_map
for name, file_idxs in map_to_use.items():
if len(file_idxs) <= 1:
continue
if USE_HASH:
name = binascii.hexlify(name).decode()
print("----------")
print("Duplicate:", name)
for idx in file_idxs:
print("\t" + str(database[idx]))
def dedup_with_target(dedup_dir: Path, base_dir: Path):
src_database = []
dst_database = []
dir_count = 0
name_map = defaultdict(list)
fhash_map = defaultdict(list)
# collect file info from source directory
for path, dirs, files in tqdm(os.walk(base_dir), leave=False):
dir_count += 1
for file in files:
file = base_dir / path / file
file_idx = len(src_database)
src_database.append(file)
if USE_HASH:
try:
fhash_map[hashfile(file)].append((False, file_idx))
except:
print("Error occured in reading", str(file))
else:
name_map[file.name].append((False, file_idx))
# collect file info from target directory
try:
for path, dirs, files in tqdm(os.walk(dedup_dir), leave=False):
dir_count += 1
for file in files:
file = dedup_dir / path / file
file_idx = len(dst_database)
dst_database.append(file)
if USE_HASH:
fhash_map[hashfile(file)].append((True, file_idx))
else:
name_map[file.name].append((True, file_idx))
except:
print("Execution terminated!")
finally:
pass
# report duplicates
print("Total %d directories" % dir_count)
map_to_use = fhash_map if USE_HASH else name_map
for name, file_idxs in map_to_use.items():
if len(file_idxs) <= 1:
continue
if all(in_dst == file_idxs[0][0] for in_dst, _ in file_idxs):
# skip inplace duplicates
continue
if USE_HASH:
name = binascii.hexlify(name).decode()
print("----------")
print("Duplicate:", name)
for in_dst, idx in file_idxs:
print(("\t+ " + str(dst_database[idx])) if in_dst else
("\t- " + str(src_database[idx])))
if __name__ == "__main__":
if len(sys.argv) == 1:
print("Please provide paths input: python dedup.py <dedup directory> [target directory]")
elif len(sys.argv) == 2:
dedup_inplace(Path(sys.argv[1]))
elif len(sys.argv) == 3:
dedup_with_target(Path(sys.argv[1]), Path(sys.argv[2]))
else:
print("Unrecongized input!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment