Skip to content

Instantly share code, notes, and snippets.

@itsPG
Created August 12, 2019 12:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save itsPG/5c181e8308391d56dfcf96da20a635a9 to your computer and use it in GitHub Desktop.
Save itsPG/5c181e8308391d56dfcf96da20a635a9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2
import sys
import os
import hashlib
def chunk_reader(fobj, chunk_size=1024):
"""Generator that reads a file in chunks of bytes"""
while True:
chunk = fobj.read(chunk_size)
if not chunk:
return
yield chunk
def get_hash(filename, first_chunk_only=False, hash=hashlib.sha1):
hashobj = hash()
file_object = open(filename, 'rb')
if first_chunk_only:
hashobj.update(file_object.read(1024))
else:
for chunk in chunk_reader(file_object):
hashobj.update(chunk)
hashed = hashobj.digest()
file_object.close()
return hashed
def check_for_duplicates(paths, hash=hashlib.sha1):
hashes_by_size = {}
hashes_on_1k = {}
hashes_full = {}
ret = []
for path in paths:
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
full_path = os.path.join(dirpath, filename)
try:
# if the target is a symlink (soft one), this will
# dereference it - change the value to the actual target file
full_path = os.path.realpath(full_path)
file_size = os.path.getsize(full_path)
except (OSError,):
# not accessible (permissions, etc) - pass on
continue
duplicate = hashes_by_size.get(file_size)
if duplicate:
hashes_by_size[file_size].append(full_path)
else:
hashes_by_size[file_size] = [] # create the list for this file size
hashes_by_size[file_size].append(full_path)
# For all files with the same file size, get their hash on the 1st 1024 bytes
for __, files in hashes_by_size.items():
if len(files) < 2:
continue # this file size is unique, no need to spend cpy cycles on it
for filename in files:
try:
small_hash = get_hash(filename, first_chunk_only=True)
except (OSError,):
# the file access might've changed till the exec point got here
continue
duplicate = hashes_on_1k.get(small_hash)
if duplicate:
hashes_on_1k[small_hash].append(filename)
else:
hashes_on_1k[small_hash] = [] # create the list for this 1k hash
hashes_on_1k[small_hash].append(filename)
# For all files with the hash on the 1st 1024 bytes, get their hash on the full file - collisions will be duplicates
for __, files in hashes_on_1k.items():
if len(files) < 2:
continue # this hash of fist 1k file bytes is unique, no need to spend cpy cycles on it
for filename in files:
try:
full_hash = get_hash(filename, first_chunk_only=False)
except (OSError,):
# the file access might've changed till the exec point got here
continue
duplicate = hashes_full.get(full_hash)
if duplicate:
print "Duplicate found: %s and %s" % (filename, duplicate)
ret.append(filename)
else:
hashes_full[full_hash] = filename
return ret
def check_dir(paths):
print '=====>', paths
ret = check_for_duplicates(paths)
print ret
if len(ret) > 0:
print 'remove those file? [Y/N]'
choice = raw_input().lower()
if choice == 'y':
for i in ret:
os.remove(i)
print 'removed'
if sys.argv[1:]:
for i in sys.argv[1:]:
check_dir([i])
else:
print "Please pass the paths to check as parameters to the script"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment