Skip to content

Instantly share code, notes, and snippets.

@coder-mike
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save coder-mike/4b4fba6df4fdcb8e8f6b to your computer and use it in GitHub Desktop.
Save coder-mike/4b4fba6df4fdcb8e8f6b to your computer and use it in GitHub Desktop.
Script to copy files to a destination if they aren't already there
import hashlib
import os
import sys
import argparse
import shutil
import time
parser = argparse.ArgumentParser(description='Copies files to a primary destination directory when they aren\'t already in one of the destination directories')
parser.add_argument('source', help='Source directory to copy files from')
parser.add_argument('primdest', help='The primary destination (where to copy files to)')
parser.add_argument('auxdest', nargs='*', help='Additional destination directories to use when checking if file already exists')
args = parser.parse_args()
targetDirectories = [os.path.abspath(path) for path in args.auxdest]
primdest = os.path.abspath(args.primdest)
targetDirectories.insert(0, primdest)
sourceDirectory = os.path.abspath(args.source)
for dst in targetDirectories:
if os.path.normpath(dst) == os.path.normpath(sourceDirectory):
print("Oops. I think you used the source directory as one of the destinations. This will not copy any files since they will all appear as already in the destination")
sys.exit(0)
def md5_for_file(file_path, block_size=2**20):
with open(file_path, 'rb') as f:
md5 = hashlib.md5()
while True:
data = f.read(block_size)
if not data:
break
md5.update(data)
return md5.hexdigest()
def includeFile(full_path):
# filename, fileExtension = os.path.splitext(full_path)
# return fileExtension.upper() == '.JPG'
return True
def printFile(progress, full_path):
# full_path = repr(full_path)
try:
print(str(progress).rjust(3) + "% " + full_path.ljust(74)[-74:], end="\r")
except:
pass
# print(("Unexpected error:" + sys.exc_info()[0]).ljust(79))
def printLine(text):
print(text.ljust(79))
print("Counting target directory files...")
targetDirectoryFiles = {}
for targetDirectory in targetDirectories:
printLine("%s..." % targetDirectory)
total_destination_count = 0
for dirName, subdirList, fileList in os.walk(targetDirectory):
for f in fileList:
full_path = os.path.join(dirName, f)
if includeFile(full_path):
total_destination_count += 1
printFile("---", full_path)
printLine("Target directories contain %i files" % total_destination_count)
printLine("Calculating hashes of target files...")
progress_count = 0
targetDuplicates = 0
for targetDirectory in targetDirectories:
printLine("%s..." % targetDirectory)
for dirName, subdirList, fileList in os.walk(targetDirectory):
for f in fileList:
full_path = os.path.join(dirName, f)
if includeFile(full_path):
progress = progress_count * 100 // total_destination_count
printFile(progress, full_path)
hash = md5_for_file(full_path)
if hash in targetDirectoryFiles:
targetDuplicates += 1
targetDirectoryFiles[hash] = full_path
progress_count += 1
printLine("Completed calculating hashes of existing target files")
printLine("")
printLine("There were %i duplicates files found in the target directories (These will not be touched)" % targetDuplicates)
printLine("")
# Process(target=scanSourceDirectory, args=(sourceDirectory, sourceScanResultQueue,))
printLine("Counting files in source directory %s..." % sourceDirectory)
total_source_count = 0
for dirName, subdirList, fileList in os.walk(sourceDirectory):
for f in fileList:
full_path = os.path.join(dirName, f)
if includeFile(full_path):
total_source_count += 1
printFile("---", full_path)
sourceDirectoryFiles = { }
sourceDuplicates = 0
filesAlreadyInDestination = 0
filesToCopy = []
printLine("Source directory contains %i files" % total_source_count)
printLine("Calculating hashes of source files...")
progress_count = 0
for dirName, subdirList, fileList in os.walk(sourceDirectory):
for f in fileList:
full_path = os.path.join(dirName, f)
if includeFile(full_path):
progress = progress_count * 100 // total_source_count
printFile(progress, full_path)
hash = md5_for_file(full_path)
if hash in sourceDirectoryFiles:
sourceDuplicates += 1
else:
if hash in targetDirectoryFiles:
filesAlreadyInDestination += 1
else:
filesToCopy.append(full_path)
sourceDirectoryFiles[hash] = full_path
progress_count += 1
printLine("Completed calculating hashes of source files")
printLine("")
printLine("%i duplicate files found in the source directory, and will *NOT* be copied" % sourceDuplicates)
printLine("%i files already exist in the destination, and will *NOT* be copied" % filesAlreadyInDestination)
printLine("%i unique files are not in the destination, and *WILL* be copied" % len(filesToCopy))
printLine("The destination directory for these files is %s" % primdest)
printLine("")
printLine("Copying files...")
# yesno = input("Are you sure you want to continue? (This is not reversible) [Y/N]\n")
# if yesno.upper() != 'Y':
# print("Operation canceled")
# sys.exit(1)
totalToCopy = len(filesToCopy)
progress_count = 0
for fileToCopy in filesToCopy:
progress = progress_count * 100 // totalToCopy
relPath = os.path.relpath(fileToCopy, sourceDirectory)
assert(relPath[0:2] != "..")
dest_path = os.path.join(primdest, relPath)
directory = os.path.dirname(dest_path)
if not os.path.exists(directory):
os.makedirs(directory)
printFile(progress, dest_path)
shutil.copy2(fileToCopy, dest_path)
progress_count += 1
printLine("Done")
printLine("%i files copied" % totalToCopy)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment