Move files based on file list in a md5 checksum file
#!/bin/env/python | |
# -*- encoding: utf8 -*- | |
# Move files based on file list in a md5 checksum file | |
# (c) Pekka Järvinen 2017- | |
import logging | |
log = logging.getLogger(__name__) | |
import os | |
import sys | |
import shutil | |
import argparse | |
import hashlib | |
__VERSION__ = "0.0.1" | |
__AUTHOR__ = u"Pekka Järvinen" | |
__YEAR__ = 2017 | |
__DESCRIPTION__ = u"Move files based on file list in a md5 checksum file. Version {0}.".format(__VERSION__) | |
__EPILOG__ = u"%(prog)s v{0} (c) {1} {2}-".format(__VERSION__, __AUTHOR__, __YEAR__) | |
__EXAMPLES__ = [ | |
u'', | |
u'-' * 60, | |
u'%(prog)s --checksums filelist.md5 --directory /home/user/sorted --source /home/user/unsorted', | |
u'-' * 60, | |
] | |
class FullPaths(argparse.Action): | |
def __call__(self, parser, namespace, values, option_string=None): | |
setattr(namespace, self.dest, os.path.abspath(os.path.expanduser(values))) | |
def is_dir(dirname: str) -> str: | |
if not os.path.isdir(dirname): | |
msg = "'{0}' is not a directory".format(dirname) | |
raise argparse.ArgumentTypeError(msg) | |
else: | |
return dirname | |
if __name__ == "__main__": | |
logging.basicConfig( | |
format='%(asctime)s [%(levelname)s]: %(message)s', | |
stream=sys.stdout, | |
level=logging.INFO, | |
) | |
parser = argparse.ArgumentParser( | |
description=__DESCRIPTION__, | |
epilog=__EPILOG__, | |
usage=os.linesep.join(__EXAMPLES__), | |
) | |
parser.add_argument('--checksums <file.md5>', '-f', type=argparse.FileType('r+', encoding='utf8'), dest='file', | |
required=True, help='MD5 Checksum file.') | |
parser.add_argument('--directory <directory>', '-d', action=FullPaths, type=is_dir, dest='directory', required=True, | |
help='Target directory to move files to. Adds possible directory name from checksum file.') | |
parser.add_argument('--source <directory>', '-s', action=FullPaths, type=is_dir, dest='sourcedir', required=True, | |
help='Source directory to read files from recursively.') | |
parser.add_argument('--verbose', '-v', action='count', required=False, default=0, dest='verbose', | |
help="Be verbose. -vvv.. Be more verbose.") | |
args = parser.parse_args() | |
if int(args.verbose) > 0: | |
logging.getLogger().setLevel(logging.DEBUG) | |
log.info("Being verbose") | |
checksums = {} | |
with args.file as f: | |
log.info("Reading file '{0}'".format(f.name)) | |
for i in f.readlines(): | |
checksum, fname = i.split(" ", 1) | |
checksum = checksum.lower() | |
fname = fname.strip() | |
fname = os.path.join(args.directory, fname) | |
log.debug("Adding '{0}' '{1}'".format(checksum, fname)) | |
if checksum not in checksums: | |
checksums[checksum] = fname | |
else: | |
log.error("Checksum '{0}' already exists in checksum list.".format(checksum)) | |
if len(checksums) == 0: | |
log.fatal("No checksums.") | |
sys.exit(1) | |
log.info("Scanning directory '{0}'..".format(args.sourcedir)) | |
for dirpath, dirnames, files in os.walk(args.sourcedir): | |
for file in files: | |
fpath = os.path.join(os.path.abspath(dirpath), file) | |
log.debug(fpath) | |
h = hashlib.md5() | |
log.debug("Checksumming file: '{0}'".format(fpath)) | |
with open(fpath, 'rb') as f: | |
for chunk in iter(lambda: f.read(1024 * 1024), b""): | |
h.update(chunk) | |
csum = h.hexdigest() | |
if csum in checksums: | |
log.info("Checksum found: {0} '{1}'".format(csum, fpath)) | |
targetfile = os.path.join(checksums[csum]) | |
os.makedirs(os.path.dirname(targetfile), exist_ok=True) | |
if os.path.isfile(targetfile): | |
log.error("File exists: '{0}'".format(targetfile)) | |
continue | |
log.info("Moving: '{0}' -> '{1}'".format(fpath, targetfile)) | |
shutil.move(fpath, targetfile) | |
else: | |
log.debug("Checksum not found: {0} '{1}'".format(csum, fpath)) | |
log.info("Done.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment