Skip to content

Instantly share code, notes, and snippets.

@raspi
Last active October 1, 2018 21:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save raspi/325ecb78610abbff9d869747ce2f2af1 to your computer and use it in GitHub Desktop.
Save raspi/325ecb78610abbff9d869747ce2f2af1 to your computer and use it in GitHub Desktop.
Move files based on file list in a md5 checksum file
#!/bin/env/python
# -*- encoding: utf8 -*-
# Move files based on file list in a md5 checksum file
# (c) Pekka Järvinen 2017-
import logging
log = logging.getLogger(__name__)
import os
import sys
import shutil
import argparse
import hashlib
__VERSION__ = "0.0.1"
__AUTHOR__ = u"Pekka Järvinen"
__YEAR__ = 2017
__DESCRIPTION__ = u"Move files based on file list in a md5 checksum file. Version {0}.".format(__VERSION__)
__EPILOG__ = u"%(prog)s v{0} (c) {1} {2}-".format(__VERSION__, __AUTHOR__, __YEAR__)
__EXAMPLES__ = [
u'',
u'-' * 60,
u'%(prog)s --checksums filelist.md5 --directory /home/user/sorted --source /home/user/unsorted',
u'-' * 60,
]
class FullPaths(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, os.path.abspath(os.path.expanduser(values)))
def is_dir(dirname: str) -> str:
if not os.path.isdir(dirname):
msg = "'{0}' is not a directory".format(dirname)
raise argparse.ArgumentTypeError(msg)
else:
return dirname
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s [%(levelname)s]: %(message)s',
stream=sys.stdout,
level=logging.INFO,
)
parser = argparse.ArgumentParser(
description=__DESCRIPTION__,
epilog=__EPILOG__,
usage=os.linesep.join(__EXAMPLES__),
)
parser.add_argument('--checksums <file.md5>', '-f', type=argparse.FileType('r+', encoding='utf8'), dest='file',
required=True, help='MD5 Checksum file.')
parser.add_argument('--directory <directory>', '-d', action=FullPaths, type=is_dir, dest='directory', required=True,
help='Target directory to move files to. Adds possible directory name from checksum file.')
parser.add_argument('--source <directory>', '-s', action=FullPaths, type=is_dir, dest='sourcedir', required=True,
help='Source directory to read files from recursively.')
parser.add_argument('--verbose', '-v', action='count', required=False, default=0, dest='verbose',
help="Be verbose. -vvv.. Be more verbose.")
args = parser.parse_args()
if int(args.verbose) > 0:
logging.getLogger().setLevel(logging.DEBUG)
log.info("Being verbose")
checksums = {}
with args.file as f:
log.info("Reading file '{0}'".format(f.name))
for i in f.readlines():
checksum, fname = i.split(" ", 1)
checksum = checksum.lower()
fname = fname.strip()
fname = os.path.join(args.directory, fname)
log.debug("Adding '{0}' '{1}'".format(checksum, fname))
if checksum not in checksums:
checksums[checksum] = fname
else:
log.error("Checksum '{0}' already exists in checksum list.".format(checksum))
if len(checksums) == 0:
log.fatal("No checksums.")
sys.exit(1)
log.info("Scanning directory '{0}'..".format(args.sourcedir))
for dirpath, dirnames, files in os.walk(args.sourcedir):
for file in files:
fpath = os.path.join(os.path.abspath(dirpath), file)
log.debug(fpath)
h = hashlib.md5()
log.debug("Checksumming file: '{0}'".format(fpath))
with open(fpath, 'rb') as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
csum = h.hexdigest()
if csum in checksums:
log.info("Checksum found: {0} '{1}'".format(csum, fpath))
targetfile = os.path.join(checksums[csum])
os.makedirs(os.path.dirname(targetfile), exist_ok=True)
if os.path.isfile(targetfile):
log.error("File exists: '{0}'".format(targetfile))
continue
log.info("Moving: '{0}' -> '{1}'".format(fpath, targetfile))
shutil.move(fpath, targetfile)
else:
log.debug("Checksum not found: {0} '{1}'".format(csum, fpath))
log.info("Done.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment