Skip to content

Instantly share code, notes, and snippets.

@rvprasad
Last active December 23, 2019 05:31
Show Gist options
  • Save rvprasad/15e8fd3102f461824b913a119be3653a to your computer and use it in GitHub Desktop.
Save rvprasad/15e8fd3102f461824b913a119be3653a to your computer and use it in GitHub Desktop.
Group files based on the their modification time. Great for organizing photos and videos. Requires Python 3.7+.
import argparse
import glob
import hashlib
import logging
import os
import pathlib
import re
import shutil
import sys
import time
# Execute as "python3.exe time_based_file_organizer.py -o <output_folder> <input_folder1> <input_folder2> .... <input_folderN>"
# All files in each of the input folders and their descendent folders will be considered for grouping.
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-o', type=str, nargs=1, help="output folder",
required=True)
arg_parser.add_argument('inputs', type=str, nargs='+', help="input folders")
args = arg_parser.parse_args(sys.argv[1:])
def get_hash(file_path):
m = hashlib.sha256()
with open(file_path, 'rb') as f:
m.update(f.read())
return m.hexdigest()
def get_dest(output_folder, src_path):
mtime = time.gmtime(os.path.getmtime(src_path))
month_part = "{0:04d}-{1:02d}".format(mtime.tm_year, mtime.tm_mon)
return (pathlib.Path(output_folder, month_part),
pathlib.PurePath(output_folder, month_part, re.sub(' \(\d+\)', '', src_path.name)))
def get_files(folder):
return glob.glob(str(pathlib.PurePath(folder, '**', '*.*')), recursive=True)
output_folder = args.o[0]
logger = logging.getLogger('Main')
logger.setLevel(logging.INFO)
fh = logging.FileHandler(pathlib.Path(output_folder, 'photo-organizer.log'), mode='wt')
fh.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
logger.addHandler(fh)
hash_2_src_dest = {}
dest_dirs = set()
num_clashes = 0
for input_folder in args.inputs:
logger.info("Scanning {0}".format(input_folder))
files = get_files(input_folder)
for i, f in enumerate(files, 1):
f_src = pathlib.PurePath(f)
f_hash = get_hash(f_src)
if f_hash in hash_2_src_dest:
num_clashes += 1
logger.warning("{0} CLASH: {1} <> {2}".format(num_clashes, hash_2_src_dest[f_hash][0], f_src))
else:
dest_dir, f_dest = get_dest(output_folder, f_src)
hash_2_src_dest[f_hash] = (f_src, f_dest)
dest_dirs.add(dest_dir)
if i % 500 == 0:
logger.info("Scanned {0} files".format(i))
fh.flush()
logger.info(len(hash_2_src_dest))
assert len(list(hash_2_src_dest.values())) == len(set(hash_2_src_dest.values()))
for d in dest_dirs:
d.mkdir(exist_ok=True)
for i, v in enumerate(hash_2_src_dest.values(), 1):
logger.info("{0}> copying {1} to {2}".format(i, v[0], v[1]))
shutil.copy2(v[0], v[1])
logging.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment