Skip to content

Instantly share code, notes, and snippets.

@debedb
Last active December 27, 2017 02:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save debedb/046a0b0dcee894da0d3e8b762957a87f to your computer and use it in GitHub Desktop.
Save debedb/046a0b0dcee894da0d3e8b762957a87f to your computer and use it in GitHub Desktop.
Find duplicate files
#!/usr/bin/python
import pprint
import os
import sys
import hashlib
import sets
import argparse
import signal
# Length to file list
lengths = {}
visited = None
processedCount = 0
processedCountsByExt = {}
def err(s):
sys.stderr.write(s)
sys.stderr.write("\n")
def visit(arg, dirname, fnames):
# For debug...
# print "Visiting %s: %s" % (dirname, str(fnames))
if not fnames:
return
for fname in fnames:
cur = os.path.join(dirname, fname)
if cur in arg:
continue
arg.append(cur)
# print "Processing %s" % cur
if os.path.isdir(cur):
try:
os.path.walk(cur, visit, arg)
except Exception, e:
err("Error visiting %s: %s" % (cur, e))
else:
if args.ignoreExtensions:
for x in args.ignoreExtensions:
if cur.endswith("."+x):
# print "Ignoring %s: %s" % (cur, x)
return
try:
stat = os.stat(cur)
except Exception, e:
err("Error reading %s: %s" % (cur, e))
continue
sz = stat.st_size
if sz == 0:
err("Ignoring %s of size 0" % cur)
continue
if not sz in lengths:
lengths[sz] = []
lengths[sz].append(cur)
base, ext = os.path.splitext(cur)
ext = ext.lower()
global processedCount
processedCount += 1
if ext not in processedCountsByExt:
processedCountsByExt[ext] = 1
else:
processedCountsByExt[ext] += 1
if processedCount % 10000 == 0:
err("Processed %s files" % processedCount)
def checkDupes():
for l in lengths.keys():
files = lengths[l]
if len(files) == 1:
# No duplicates
del lengths[l]
continue
# err("Checking files of length %s: %s" % (l, str(files)))
hashes = {}
for fname in files:
try:
f = open(fname)
with f:
data = f.read()
sha = hashlib.sha256(data)
hash = sha.hexdigest()
hashes[fname] = hash
except Exception, e:
err("Error reading %s: %s" % (fname, e))
dupHashes = {}
for i in range(len(files)):
f1 = files[i]
h1 = hashes[f1]
for j in range(i+1, len(files)):
f2 = files[j]
if i == j:
continue
h2 = hashes[f2]
if h1 == h2:
if h1 not in dupHashes:
dupHashes[h1] = sets.Set(["\"%s\"" % f1])
dupHashes[h1].add("\"%s\"" % f2)
for dh in dupHashes:
dhs = dupHashes[dh]
if len(dhs) > 1:
print "%s" % ' '.join(dhs)
else:
err("How did this happen, for hash %s only %s" % dh, str(dhs))
def intHandler(signum, frame):
err("Interrupted...")
finalize()
sys.exit(1)
def finalize():
err("Processed %s: " % processedCount)
ppr = pprint.PrettyPrinter(indent=2, stream=sys.stderr)
ppr.pprint(processedCountsByExt)
checkDupes()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('dir',
metavar='dir',
nargs=1,
help='Directory to process')
parser.add_argument('--x',
dest='ignoreExtensions',
metavar='ext1,ext2',
nargs=1,
help='Comma-separated list of extensions (without .) to ignore')
global args
args = parser.parse_args()
signal.signal(signal.SIGINT, intHandler)
if args.ignoreExtensions:
args.ignoreExtensions = args.ignoreExtensions[0].split(",")
err("Will ignore extensions %s" % str(args.ignoreExtensions))
global visited
visited = sets.Set([])
os.path.walk(args.dir[0], visit, [])
finalize()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment