Skip to content

Instantly share code, notes, and snippets.

@ekevoo
Last active Aug 5, 2019
Embed
What would you like to do?
file deduplicator
#!/usr/bin/python3
#
# MIT License
#
# Copyright (c) 2019 Ekevoo.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse
import collections
import functools
import hashlib
import sys
import zipfile
from pathlib import Path
from typing import Iterable, List, Set, Union
import tqdm
def main():
args = parse_args(sys.argv[1:])
directory = Path(args.target_directory)
found_files = sorted(traverse_directory(directory))
# run hashes
files_by_hash = collections.defaultdict(set)
print("Running:", args.hash_algorithm.__name__)
for path in tqdm.tqdm(found_files, ascii=True):
hashed = get_hash(path, args.hash_algorithm)
files_by_hash[hashed].add(path)
total = sum(len(s) for s in files_by_hash.values())
print("Visited %d files, obtained %d unique hashes." % (total, len(files_by_hash)))
# discard unique files
for key in [k for k, v in files_by_hash.items() if len(v) == 1]:
del files_by_hash[key]
# ask questions
collisions = sorted(files_by_hash.values(), key=lambda s: min(s).name.lower())
report_and_ask_for_action(collisions)
def parse_args(argv):
parser = argparse.ArgumentParser()
parser.add_argument("target_directory", nargs="?", default=".")
parser.add_argument("--hash-algorithm", default="sha256")
arguments = parser.parse_args(argv)
arguments.hash_algorithm = getattr(hashlib, arguments.hash_algorithm)
return arguments
def traverse_directory(directory: Path) -> Iterable[Path]:
for item in directory.iterdir():
if item.is_symlink():
pass # symlinks cause tons of false duplicates! :(
elif item.is_dir():
yield from traverse_directory(item)
elif item.is_file():
yield item
def get_hash(path, chosen_hash) -> bytes:
d = chosen_hash()
if path.suffix.lower() == ".zip":
# zip files contain irrelevant datetime information, but
# also precalculated CRC-32 hashes. so hash those along
# with their corresponding filenames & ignore contents.
with zipfile.ZipFile(path.open("rb")) as package:
zip_file_entries_info = package.infolist()
pre_hashed_entries = sorted(
e.filename.lower().encode() + e.CRC.to_bytes(4, "little")
for e in zip_file_entries_info
)
for entry in pre_hashed_entries:
d.update(entry)
else:
# unknown format, hash full contents of the file.
with path.open("rb") as stream:
for buf in iter(functools.partial(stream.read, 0x100), b""):
d.update(buf)
return d.digest()
def report_and_ask_for_action(collisions: List[Set[Path]]):
if not collisions:
print("No collisions were found! Yay! :D")
return
print("%d collisions were found:" % len(collisions))
for item in collisions:
print(" * %s" % "; ".join(sorted(f.with_suffix("").name for f in item)))
print()
print("==========================================================")
print("Let's start!")
for collision_set in collisions:
print("==========================================================")
choice = choose_for_collision_set(collision_set)
# choose what to delete (and whatever further preparatory steps)
if not choice:
forsaken = ()
print("~~~ No action.")
elif choice in collision_set:
forsaken = collision_set - {choice}
else:
old_path = next(iter(collision_set))
forsaken = collision_set - {old_path}
old_path.rename(choice)
print("~~~ Renamed: %s ~~~ To: %s" % (old_path, choice))
# burn the spares!
for saken in forsaken:
saken.unlink()
print("~~~ Deleted: %s" % saken)
def choose_for_collision_set(pathes: Set[Path]) -> Union[Path, str]:
choices = sorted(pathes, key=lambda p: p.name.lower())
while True:
for i, text in enumerate(choices):
print("%2d) %s" % (i, text))
try:
response = input("Pick one, type a new name, or ENTER to skip: ")
except (KeyboardInterrupt, EOFError):
print(" ...Bye!")
sys.exit(1)
# test for skip first
if not response:
return ""
# test for existing choice
try:
return choices[int(response)]
except (ValueError, IndexError):
pass
# add new choice
choices.append(response)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment