Last active
October 1, 2018 21:19
-
-
Save raspi/325ecb78610abbff9d869747ce2f2af1 to your computer and use it in GitHub Desktop.
Move files based on file list in a md5 checksum file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env/python | |
# -*- encoding: utf8 -*- | |
# Move files based on file list in a md5 checksum file | |
# (c) Pekka Järvinen 2017- | |
import logging | |
log = logging.getLogger(__name__) | |
import os | |
import sys | |
import shutil | |
import argparse | |
import hashlib | |
__VERSION__ = "0.0.1" | |
__AUTHOR__ = u"Pekka Järvinen" | |
__YEAR__ = 2017 | |
__DESCRIPTION__ = u"Move files based on file list in a md5 checksum file. Version {0}.".format(__VERSION__) | |
__EPILOG__ = u"%(prog)s v{0} (c) {1} {2}-".format(__VERSION__, __AUTHOR__, __YEAR__) | |
__EXAMPLES__ = [ | |
u'', | |
u'-' * 60, | |
u'%(prog)s --checksums filelist.md5 --directory /home/user/sorted --source /home/user/unsorted', | |
u'-' * 60, | |
] | |
class FullPaths(argparse.Action): | |
def __call__(self, parser, namespace, values, option_string=None): | |
setattr(namespace, self.dest, os.path.abspath(os.path.expanduser(values))) | |
def is_dir(dirname: str) -> str: | |
if not os.path.isdir(dirname): | |
msg = "'{0}' is not a directory".format(dirname) | |
raise argparse.ArgumentTypeError(msg) | |
else: | |
return dirname | |
if __name__ == "__main__": | |
logging.basicConfig( | |
format='%(asctime)s [%(levelname)s]: %(message)s', | |
stream=sys.stdout, | |
level=logging.INFO, | |
) | |
parser = argparse.ArgumentParser( | |
description=__DESCRIPTION__, | |
epilog=__EPILOG__, | |
usage=os.linesep.join(__EXAMPLES__), | |
) | |
parser.add_argument('--checksums <file.md5>', '-f', type=argparse.FileType('r+', encoding='utf8'), dest='file', | |
required=True, help='MD5 Checksum file.') | |
parser.add_argument('--directory <directory>', '-d', action=FullPaths, type=is_dir, dest='directory', required=True, | |
help='Target directory to move files to. Adds possible directory name from checksum file.') | |
parser.add_argument('--source <directory>', '-s', action=FullPaths, type=is_dir, dest='sourcedir', required=True, | |
help='Source directory to read files from recursively.') | |
parser.add_argument('--verbose', '-v', action='count', required=False, default=0, dest='verbose', | |
help="Be verbose. -vvv.. Be more verbose.") | |
args = parser.parse_args() | |
if int(args.verbose) > 0: | |
logging.getLogger().setLevel(logging.DEBUG) | |
log.info("Being verbose") | |
checksums = {} | |
with args.file as f: | |
log.info("Reading file '{0}'".format(f.name)) | |
for i in f.readlines(): | |
checksum, fname = i.split(" ", 1) | |
checksum = checksum.lower() | |
fname = fname.strip() | |
fname = os.path.join(args.directory, fname) | |
log.debug("Adding '{0}' '{1}'".format(checksum, fname)) | |
if checksum not in checksums: | |
checksums[checksum] = fname | |
else: | |
log.error("Checksum '{0}' already exists in checksum list.".format(checksum)) | |
if len(checksums) == 0: | |
log.fatal("No checksums.") | |
sys.exit(1) | |
log.info("Scanning directory '{0}'..".format(args.sourcedir)) | |
for dirpath, dirnames, files in os.walk(args.sourcedir): | |
for file in files: | |
fpath = os.path.join(os.path.abspath(dirpath), file) | |
log.debug(fpath) | |
h = hashlib.md5() | |
log.debug("Checksumming file: '{0}'".format(fpath)) | |
with open(fpath, 'rb') as f: | |
for chunk in iter(lambda: f.read(1024 * 1024), b""): | |
h.update(chunk) | |
csum = h.hexdigest() | |
if csum in checksums: | |
log.info("Checksum found: {0} '{1}'".format(csum, fpath)) | |
targetfile = os.path.join(checksums[csum]) | |
os.makedirs(os.path.dirname(targetfile), exist_ok=True) | |
if os.path.isfile(targetfile): | |
log.error("File exists: '{0}'".format(targetfile)) | |
continue | |
log.info("Moving: '{0}' -> '{1}'".format(fpath, targetfile)) | |
shutil.move(fpath, targetfile) | |
else: | |
log.debug("Checksum not found: {0} '{1}'".format(csum, fpath)) | |
log.info("Done.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment