Skip to content

Instantly share code, notes, and snippets.

@conrad784
Created February 21, 2018 09:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save conrad784/cb0aa0aff2619fa43d34d8c2bff79fb5 to your computer and use it in GitHub Desktop.
Save conrad784/cb0aa0aff2619fa43d34d8c2bff79fb5 to your computer and use it in GitHub Desktop.
python script to scan directory for duplicate files and keeping the latest file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (C) 2018 Conrad Sachweh
"""NAME
%(prog)s - <description>
SYNOPSIS
%(prog)s [--help]
DESCRIPTION
none
FILES
none
SEE ALSO
nothing
DIAGNOSTICS
none
BUGS
none
AUTHOR
Conrad Sachweh, conrad@csachweh.de
"""
#--------- Classes, Functions, etc ---------------------------------------------
def checksum(filename, algo="sha256", block_size=65536):
"""
read file with chunk sizes, 65536 = 64kb chunk
"""
import hashlib
hashFunction = getattr(hashlib, algo)()
with open(filename, 'rb', buffering=0) as f:
for block in iter(lambda: f.read(block_size), b''):
hashFunction.update(block)
return filename, hashFunction.hexdigest()
# functions from https://stackoverflow.com/a/8558403
def walk_files(topdir):
"""yield up full pathname for each file in tree under topdir"""
import os
for dirpath, dirnames, filenames in os.walk(topdir):
for fname in filenames:
pathname = os.path.join(dirpath, fname)
yield pathname
def files_to_process(topdir, size_limit=10000000):
"""
yield up full pathname for only files we want to process
size_limit in bytes
"""
import os
from stat import S_ISREG
for fname in walk_files(topdir):
try: sr = os.stat(fname)
except OSError: pass
else:
# if it is a regular file and small enough, we want to process it
if S_ISREG(sr.st_mode) and sr.st_size <= size_limit:
yield fname
def get_file_info(files):
import os
info = {}
for item in files:
st = os.stat(item)
info[item] = {"size": st.st_size, "timestamp": st.st_mtime}
return info
def get_filesize(files):
finfo = get_file_info(files)
size = 0
for fname, info in finfo.items():
size += info.get("size")
return size
def get_latest(finfo):
mtime = 0
latestfile = None
for fname, info in finfo.items():
if info.get("timestamp") > mtime:
mtime = info.get("timestamp")
latestfile = fname
return latestfile, mtime
def sizeof_fmt(num, suffix='B'):
"""
https://stackoverflow.com/a/1094933
"""
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)
#-------------------------------------------------------------------------------
# Main
#-------------------------------------------------------------------------------
if __name__=="__main__":
import sys, os, glob
from multiprocessing import Pool, freeze_support
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-r', '--recursive', action='store_true',
help="do this recursive")
parser.add_argument('-v', '--verbose', action='count', default=0,
help='show more verbose output')
parser.add_argument('--cores', action='store', default=1, type=int,
help='hashing is usually I/O bound, but feel free to increase this for high performance storage devices or only small files')
parser.add_argument('--dry-run', action='store_true',
help="only index directory, don't actually delete file")
parser.add_argument('directory', nargs=1, help='search directory')
args = parser.parse_args()
if args.verbose:
print("[INFO]", args)
# fix directory for invalid inputs
mdir = args.directory[0]
if not mdir.endswith("/"):
mdir = mdir + "/"
freeze_support()
nprocesses = args.cores
pool = Pool(processes=nprocesses)
from collections import defaultdict
allFiles = defaultdict(list)
# get the files to look at
sizeLimit = 2000000000 # ~1.9GB
files = files_to_process(mdir, sizeLimit)
print("Scanning directory {}".format(mdir))
# initializing progress bar
l = len(list(files_to_process(mdir, sizeLimit)))
from tqdm import tqdm
pbar = tqdm(total=l)
# calculate checksums paralellized
for fname, hexdigest in pool.imap_unordered(checksum, files):
pbar.update(1)
allFiles[hexdigest].append(fname)
pbar.close()
print("Evaluating for duplicates")
deleteFiles = []
for hexdigest, files in allFiles.items():
if len(files) > 1:
finfo = get_file_info(files)
latestfile = get_latest(finfo)
for item in files:
if not latestfile[0] == item:
deleteFiles.append(item)
print("Going to delete {} files. You will gain {}.".format(len(deleteFiles), sizeof_fmt(get_filesize(deleteFiles))))
if deleteFiles:
print(deleteFiles)
decision = input('Do you really want to delete those files? [y/N] ')
if decision.startswith("y") and not args.dry_run:
for item in deleteFiles:
os.remove(item)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment