Created
January 20, 2019 18:10
Star
You must be signed in to star a gist
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# set expandtab ts=4 sw=4 ai fileencoding=utf-8 | |
# | |
# Author: PB | |
# Maintainer(s): PB | |
# License: (c) HRDAG 2019, GPL v2 or newer | |
# | |
# ----------------------------------------------------------- | |
# archiver/bin/getpix.py | |
import argparse | |
import os | |
import re | |
import shutil | |
import datetime | |
from subprocess import Popen, PIPE, run | |
import collections | |
import uuid | |
import sys | |
from pathlib import Path | |
date_re = re.compile(r'Masters/(\d{4}/\d{2}/\d{2})') | |
photo_exts = {'.cr2', '.nef', '.raf', '.jpg', '.jpeg', '.rw2', '.tif', | |
'.tiff', '.mov', '.mp4'} | |
splt = re.compile(r'Image timestamp : ') | |
fnsanitizer = re.compile(r'\s+|\(|\)|,|:') | |
cnts = collections.defaultdict(int) | |
def getargs(): | |
parser = argparse.ArgumentParser(description="copies image files from a" | |
" source directory into a " | |
" bydate/YYYY/MM/DD structure," | |
" preserving the EXIF date when possible." | |
" Depends on gnu find and exiv2 on path.") | |
parser.add_argument("--verbose", '-v', action='store_true') | |
parser.add_argument("source", nargs=1) | |
parser.add_argument("destination", nargs=1) | |
return parser.parse_args() | |
def _now(): | |
return datetime.datetime.now().isoformat()[0:19] | |
def get_exif_ts_dir(dpath): | |
def _linesp(line): | |
pth, ts = [s.strip() for s in splt.split(line)] | |
pth = Path(pth).name | |
ts = ts[0:10].replace(":", "/") | |
return pth, ts | |
dpath = str(Path(dpath).resolve()) | |
with open(os.devnull, 'w') as FNULL: | |
stdopts = {'stdout': PIPE, 'stderr': FNULL} | |
findcmd = ['find', dpath, '-maxdepth', '1', '-type', 'f', '-print0'] | |
ps1 = Popen(findcmd, **stdopts) | |
ps2 = Popen(['xargs', '-0', 'exiv2'], stdin=ps1.stdout, **stdopts) | |
runopts = {'stdin': ps2.stdout, 'capture_output': True} | |
result = run(['grep', 'timestamp'], **runopts) | |
if result.returncode != 0: | |
return dict() | |
lines = filter(None, result.stdout.decode('utf-8').split('\n')) | |
return dict(_linesp(x) for x in lines) | |
def date_from_pth(pth): | |
s = date_re.search(str(pth)) | |
if s: | |
filedatepart = s.groups()[0] | |
cnts['date from ap path'] += 1 | |
else: | |
filedatepart = 'no_date' | |
cnts['no_date'] += 1 | |
return filedatepart | |
def well_formed_dt(d): | |
dp = d.split('/') | |
try: | |
year = int(dp[0]) | |
except ValueError: | |
return 'no_date' | |
if year < 1900 or 2020 < year: | |
return 'no_date' | |
elif len(dp) == 3: | |
pass | |
elif len(dp) == 2: | |
dp.append('01') | |
elif len(dp) == 1: | |
dp.extend(['01', '01']) | |
else: | |
return 'no_date' | |
dp[1] = dp[1].rjust(2, '0') | |
dp[2] = dp[2].rjust(2, '0') | |
dp = '/'.join(dp) | |
return dp | |
if __name__ == '__main__': | |
args = getargs() | |
dstroot = Path(args.destination[0]).resolve() | |
assert dstroot.parts[-1] == 'bydate' | |
print(f"start {_now()}") | |
dirpath = Path(args.source[0]).resolve() | |
for dirpath, dirs, files in os.walk(dirpath): | |
rpth = Path(dirpath) | |
name2ts = get_exif_ts_dir(dirpath) | |
for f in files: | |
pth = rpth / f | |
suff = pth.suffix.strip().lower() | |
if suff not in photo_exts: | |
cnts['suffix not found'] += 1 | |
continue | |
filedatepart = name2ts.get(f, None) | |
if filedatepart is None: | |
filedatepart = date_from_pth(pth) | |
cpsym = '-' | |
else: | |
cpsym = '=' | |
filedatepart = well_formed_dt(filedatepart) | |
fname = fnsanitizer.sub('_', f) | |
dst = dstroot / filedatepart / fname | |
try: | |
dstat = dst.stat() | |
except FileNotFoundError: | |
dstat = None # OK to write | |
if dstat and pth.stat().st_size == dstat.st_size: | |
cnts['found identical'] += 1 | |
continue | |
elif dstat: | |
fname = str(uuid.uuid4()) + pth.suffix.lower() | |
dst = dstroot / filedatepart / fname | |
cnts['renamed w uuid'] += 1 | |
dst.parent.mkdir(parents=True, exist_ok=True) | |
shutil.copy2(pth, dst, follow_symlinks=False) | |
if args.verbose: | |
print(f"{pth} {cpsym}> {dst}") | |
cnts['copied'] += 1 | |
if args.verbose and cnts['copied'] % 500 == 0: | |
print(f"{_now()}: {dict(cnts)}.") | |
now = datetime.datetime.now().isoformat() | |
print(f"{_now()}: {dict(cnts)}. DONE.") | |
# done. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I ramble on about this gist in a blogpost.