Skip to content

Instantly share code, notes, and snippets.

@platomav
Forked from ntjess/duplicates.py
Last active February 13, 2024 12:26
Show Gist options
  • Save platomav/48663c289a1858f140869792357f6fa8 to your computer and use it in GitHub Desktop.
Save platomav/48663c289a1858f140869792357f6fa8 to your computer and use it in GitHub Desktop.
Fast duplicate file/link finder (and optionally deleter), for Posix/NT, written in Python 3
#!/usr/bin/env python3
# coding=utf-8
# pylint: disable=C0301,R0902,R0903,R0913,W0703
"""
Fast duplicate file/link finder (and deleter)
Usage: duplicates.py [-h] [-d] [-a HASH_ALGORITHM] [-c CHUNK_SIZE] [-m MAX_SIZE] [-l LOG_LEVEL] [paths ...]
Based on https://stackoverflow.com/a/36113168/300783 by Todor Minakov
Based on https://gist.github.com/ntjess/1663d25d09bd762af2f0c60f600191f5 by Nathan Jessurun
https://gist.github.com/platomav/48663c289a1858f140869792357f6fa8 by Plato Mavropoulos
Added duplicate link detection, duplicate deletion option, sorted results, detailed status,
message logging, object-oriented structure, variable chunk size and hash algorithm inputs.
"""
import getpass
import hashlib
import os
import stat
import subprocess
from argparse import ArgumentParser, Namespace
from collections import defaultdict
from logging import getLogger, INFO, Logger, StreamHandler
class Duplicates:
""" Fast duplicate file/link finder (and deleter) """
def __init__(self, in_paths: list | None = None, is_delete: bool | None = None, hash_alg: str | None = None,
chunk_size: int | None = None, max_size: int | None = None, log_level: int | None = None,
log_logger: Logger | None = None) -> None:
_log_level: int = log_level if isinstance(log_level, int) else INFO
if isinstance(log_logger, Logger):
self.logger: Logger = log_logger
self.logger.setLevel(_log_level)
else:
self.logger = getLogger(__name__)
self.logger.setLevel(_log_level)
self.logger_handler: StreamHandler = StreamHandler()
self.logger_handler.setLevel(self.logger.getEffectiveLevel())
self.logger.addHandler(self.logger_handler)
self.check_paths: list = in_paths if isinstance(in_paths, list) and in_paths else []
self.is_delete: bool = is_delete if isinstance(is_delete, bool) else False
self.hash_algorithm: str = str(hash_alg) if hash_alg in hashlib.algorithms_guaranteed else 'sha256'
self.chunk_size: int = chunk_size if isinstance(chunk_size, int) and chunk_size > 0 else 1024
self.max_size: int = max_size if isinstance(max_size, int) and max_size > 0 else 0x200000000
self.follow_symlinks_false: bool = os.access not in os.supports_follow_symlinks
self.initial_size: int = 0
self.initial_count: int = 0
self.duplicate_size: int = 0
self.duplicate_count: int = 0
self.files_by_size: dict = defaultdict(list)
self.files_by_full_hash: dict = defaultdict(list)
self.files_by_partial_hash: dict = defaultdict(list)
self.duplicate_ratio: str = ''
self.size_reduction: str = ''
def _process_input_paths(self) -> None:
""" Process input files/links/directories """
for check_path in self.check_paths:
check_path_abs: str = os.path.abspath(check_path)
if os.path.isdir(check_path_abs):
# noinspection PyArgumentEqualDefault
for root_path, _, file_names in os.walk(check_path_abs, followlinks=False):
for file_name in file_names:
self._get_input_info(input_path=os.path.join(root_path, file_name))
elif self._is_valid_path(input_path=check_path_abs, allow_broken_links=True):
self._get_input_info(input_path=check_path_abs)
else:
self.logger.error('Input path is neither file/link nor directory: %s', check_path_abs)
# noinspection PyTypeChecker
def _process_duplicate_paths(self) -> None:
""" Delete duplicates (if chosen) and show a summary of all processed files """
for file_hash, file_list in sorted(self.files_by_full_hash.items()):
file_list_len: int = len(file_list)
if file_list_len >= 2:
self.logger.info('Found %d files with hash %s', file_list_len, file_hash)
for file_index, file_path in enumerate(sorted(file_list, key=lambda fp: (len(fp), fp))):
if file_index == 0:
self.logger.info('%s [Original]', file_path)
else:
if os.path.islink(file_path):
self.duplicate_size += len(self._get_link_data(link_path=file_path))
else:
self.duplicate_size += os.path.getsize(file_path)
if self.is_delete:
self._delete_path(input_path=file_path)
else:
self.logger.info('%s [Duplicate]', file_path)
def _get_input_hash(self, input_path: str, first_chunk: bool = False) -> str:
""" Calculate input hash, first chunk only or entire contents """
hash_object = getattr(hashlib, self.hash_algorithm)()
if self._is_path_accessible(input_path=input_path, fix_access=True, allow_links=True):
if first_chunk:
if os.path.islink(input_path):
hash_object.update(self._get_link_data(link_path=input_path)[:self.chunk_size])
else:
with open(input_path, 'rb') as file_object:
hash_object.update(file_object.read(self.chunk_size))
else:
if os.path.islink(input_path):
hash_object.update(self._get_link_data(link_path=input_path))
else:
with open(input_path, 'rb') as file_object:
hash_object.update(file_object.read())
return hash_object.hexdigest().upper()
def _get_input_info(self, input_path: str) -> None:
""" Get input information (size, count) """
if self._is_path_accessible(input_path=input_path, fix_access=True, allow_links=True):
if os.path.islink(input_path):
file_size: int = len(self._get_link_data(link_path=input_path))
else:
file_size = os.path.getsize(input_path)
self.initial_count += 1
self.initial_size += file_size
self.files_by_size[file_size].append(input_path)
def _get_hashes_partial(self) -> None:
""" For all same size files, get their first data chunk partial hash """
for file_size, file_paths in self.files_by_size.items():
if len(file_paths) >= 2:
for file_path in file_paths:
hash_partial: str = self._get_input_hash(input_path=file_path, first_chunk=True)
# Add this file to the list of others sharing the same partial hash
self.files_by_partial_hash[(file_size, hash_partial)].append(file_path)
def _get_hashes_full(self) -> None:
""" For all same partial hash files, get their full data hash (collisions are duplicates) """
for file_paths in self.files_by_partial_hash.values():
if len(file_paths) >= 2:
for file_path in file_paths:
# noinspection PyArgumentEqualDefault
hash_full: str = self._get_input_hash(input_path=file_path, first_chunk=False)
# Add this file to the list of others sharing the same full hash
self.files_by_full_hash[hash_full].append(file_path)
def _is_path_accessible(self, input_path: str, fix_access: bool = False, allow_links: bool = False) -> bool:
""" Check if file/link is accessible, otherwise attempt to fix access """
input_path_abs: str = os.path.abspath(input_path)
try:
# Check (and optionally fix) input path read access, no-dereference logic (when applicable)
if not os.access(path=input_path_abs, mode=os.R_OK, follow_symlinks=self.follow_symlinks_false):
if fix_access:
self._fix_path_access(input_path=input_path_abs)
# Check again for input path read access, no-dereference logic (when applicable)
if not os.access(path=input_path_abs, mode=os.R_OK, follow_symlinks=self.follow_symlinks_false):
raise OSError('Path is not read accessible, access fix attempted!')
else:
raise OSError('Path is not read accessible, access fix disabled!')
# Check that input is not a symlink, when ignored
if not allow_links and os.path.islink(input_path_abs):
raise OSError('Path is a symbolic link!')
# Check that input is a valid file or (broken, when allowed) symlink
if not self._is_valid_path(input_path=input_path_abs, allow_broken_links=allow_links):
raise OSError('Path is not a file!')
# Check that input file is not too large (e.g. > 8GB), when applicable (i.e. non-link)
if not os.path.islink(input_path_abs) and os.path.getsize(input_path_abs) > self.max_size:
raise OSError(f'File is larger than {self._get_bytes_str(self.max_size)}!')
return True
except Exception as exception:
self.logger.error('Failure while trying to access file %s: %s', input_path_abs, exception)
return False
def _fix_path_access(self, input_path: str) -> None:
""" Attempt to fix path ownership and permissions under Posix/NT """
input_path_abs: str = os.path.abspath(input_path)
current_user: str = self._get_current_user()
os_platform: str = os.name
call_args: dict = {'shell': True, 'stdout': subprocess.DEVNULL, 'stderr': subprocess.DEVNULL}
try:
if os_platform == 'posix':
# Change Posix path ownership to current user, no-dereference logic
_ = subprocess.call([f'chown -hfR {current_user}:{current_user} "{input_path_abs}"'], **call_args)
# Change Posix path permissions to allow Read/Write
_ = subprocess.call([f'chmod -fR +rw "{input_path_abs}"'], **call_args)
elif os_platform == 'nt':
# Change NT path ownership to current user as well as permissions to allow Read/Write
_ = subprocess.call(['icacls', input_path_abs, '/grant', f'{current_user}:(OI)(CI)RW'], **call_args)
# Remove NT FILE_ATTRIBUTE_READ_ONLY
os.chmod(input_path_abs, stat.S_IWRITE)
else:
raise OSError(f'Unknown OS platform: "{os_platform}"!')
except Exception as exception:
self.logger.error('Failed to fix access of path %s: %s', input_path_abs, exception)
def _delete_path(self, input_path: str) -> None:
""" Delete path, if possible """
if self._is_valid_path(input_path=input_path, allow_broken_links=True):
# Check fix input path write access, no-dereference logic (when applicable)
if not os.access(path=input_path, mode=os.W_OK, follow_symlinks=self.follow_symlinks_false):
self._fix_path_access(input_path=input_path)
os.remove(input_path)
self.logger.info('%s [Deleted]', input_path)
else:
self.logger.error('%s [Error]', input_path)
@staticmethod
def _is_valid_path(input_path: str, allow_broken_links: bool = False) -> bool:
""" Check if path is a regular file or symlink (valid or broken) """
input_path_abs: str = os.path.abspath(input_path)
if os.path.lexists(input_path_abs):
if not os.path.isdir(input_path_abs):
if allow_broken_links:
return os.path.isfile(input_path_abs) or os.path.islink(input_path_abs)
return os.path.isfile(input_path_abs)
return False
@staticmethod
def _get_link_data(link_path: str) -> bytes:
""" Get the actual target path of a symlink """
# noinspection PyArgumentEqualDefault
return os.readlink(link_path).encode(encoding='utf-8', errors='replace')
@staticmethod
def _get_dict_val_sum(input_dict: dict) -> int:
""" Get sum of all list items within the values of a dictionary """
return sum(len(values) for values in input_dict.values())
@staticmethod
def _get_percent_str(part: int, whole: int) -> str:
""" Calculate percentage between "part" and "whole" values """
return f'{part / whole if whole else 0:.2%}'
@staticmethod
def _get_bytes_str(bytes_count: int | float) -> str:
""" Append size measurement unit to bytes value """
for bytes_unit in ('bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'):
if bytes_count < 1024:
break
bytes_count /= 1024
return f'{bytes_count:.1f} {bytes_unit}'
@staticmethod
def _get_current_user() -> str:
""" Get current user from terminal or system """
try:
current_user: str = os.getlogin()
except OSError:
current_user = getpass.getuser()
return current_user
def check_duplicates(self) -> None:
""" Check for duplicate files/links at input paths """
self.logger.info('Checking for duplicate files...')
self._process_input_paths()
self.logger.info('Files count: %d', self.initial_count)
self.logger.info('Files size: %s', self._get_bytes_str(self.initial_size))
self._get_hashes_partial()
self._get_hashes_full()
self.duplicate_count = self._get_dict_val_sum(input_dict=self.files_by_full_hash) - len(self.files_by_full_hash)
self.logger.info('Duplicate files: %d', self.duplicate_count)
self._process_duplicate_paths()
self.duplicate_ratio = self._get_percent_str(part=self.duplicate_count, whole=self.initial_count)
self.logger.info('Duplicate ratio: %d / %d files (%s)', self.duplicate_count,
self.initial_count, self.duplicate_ratio)
self.size_reduction = self._get_percent_str(part=self.duplicate_size, whole=self.initial_size)
self.logger.info('Size reduction: %s / %s (%s)', self._get_bytes_str(self.duplicate_size),
self._get_bytes_str(self.initial_size), self.size_reduction)
self.logger.info('Finished checking for duplicate files!')
if __name__ == "__main__":
parser: ArgumentParser = ArgumentParser()
parser.add_argument('paths', nargs='*')
parser.add_argument('-d', '--delete', help='delete duplicate files', action='store_true')
parser.add_argument('-a', '--hash-algorithm', help='file detection hash algorithm', type=str)
parser.add_argument('-c', '--chunk-size', help='file checking chunk size', type=int)
parser.add_argument('-m', '--max-size', help='file checking maximum size', type=int)
parser.add_argument('-l', '--log-level', help='message logging level', type=int)
arguments: Namespace = parser.parse_args()
if arguments.paths:
Duplicates(in_paths=arguments.paths, is_delete=arguments.delete,
hash_alg=arguments.hash_algorithm, chunk_size=arguments.chunk_size,
max_size=arguments.max_size, log_level=arguments.log_level).check_duplicates()
else:
parser.print_help()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment