Skip to content

Instantly share code, notes, and snippets.

@brentvollebregt
Last active January 29, 2023 01:04
Show Gist options
  • Save brentvollebregt/e865e0a3a76bd400c504378ad91bd64c to your computer and use it in GitHub Desktop.
Save brentvollebregt/e865e0a3a76bd400c504378ad91bd64c to your computer and use it in GitHub Desktop.
"""
Google Photos Takeout Processor
This tool takes an unzipped and merged (for takeouts above 10Gb) Google Photos Takeout and filters duplicates, applies
metadata and moves all files to an output folder with each file having the format
`YYYY-MM-DD_HH-MM-SS {original_filename}`.
Setup:
1. Install tqdm: `pip install tqdm`
2. Install loguru: `pip install loguru`
3. Download exiftool from https://exiftool.org/ and specify the binary location in EXIFTOOL_BINARY_LOCATION below.
- You can also just use `exiftool` as the value if it is on the PATH and can be found by cmd.
Typical usage: python google_photos_takeout_metadata_applier.py -i {folder containing takeout(s)} -o {output folder}
The target of this was to do something like https://github.com/TheLastGimbus/GooglePhotosTakeoutHelper but do support video formats also.
"""
import argparse
import datetime
import json
import os
import re
import subprocess
from collections import defaultdict
import hashlib
from pathlib import Path
import sys
from typing import List, Dict, Optional
from tqdm import tqdm # pip install tqdm
from loguru import logger # pip install loguru
EXIFTOOL_BINARY_LOCATION = r"C:\Users\Brent\Programs\exiftool.exe" # Docs: https://exiftool.org/exiftool_pod.html
SUPPORTED_IMAGE_FORMATS = ['png', 'jpg', 'jpeg', 'gif', 'heic']
SUPPORTED_VIDEO_FORMATS = ['mp4', 'mov']
# Setup logging
logger.remove()
logger.add(lambda msg: tqdm.write(msg, end=""), format="{message}", level="INFO")
logger.add("{time}.log", level="DEBUG", encoding="utf8") # Set level="TRACE" to get more info
class ExifToolWrapper(object):
""" src: https://stackoverflow.com/a/10075210/3774244 """
sentinel = b"{ready}\r\n"
def __enter__(self):
self.process = subprocess.Popen(
[EXIFTOOL_BINARY_LOCATION, "-stay_open", "True", "-@", "-"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT # A lot of warnings are written to stderr by exiftool - this pipes them to stdout so they can be logged and exiftool doesn't block waiting for us to read the stream (as it just goes into what we are already reading)
)
logger.debug('Opened exiftool')
return self
def __exit__(self, exc_type, exc_value, traceback):
self.process.stdin.write(b"-stay_open\nFalse\n")
self.process.stdin.flush()
logger.debug('Closed exiftool')
def execute(self, *args):
args = args + ("-execute\n",)
self.process.stdin.write(str.join("\n", args).encode())
self.process.stdin.flush()
logger.trace(f'Wrote to exiftool: {args}')
output = b""
fd = self.process.stdout.fileno()
while not output.endswith(self.sentinel):
output += os.read(fd, 4096)
logger.trace(f'Read from exiftool: {output}')
# Check for errors
if b"0 image files updated" in output:
logger.error(f"Error updating file with exiftool: {output}")
return output[:-len(self.sentinel)]
class MediaFile:
""" Class for keeping track of an individual media file """
path: Path
metadata: Optional[Dict] = None
def __init__(self, path: Path):
self.path = path
def get_low_cost_hash(self) -> str:
""" A md5 sum of part of this file. Intended to be used to check for **potential** matches. """
hash_md5 = hashlib.md5()
with open(str(self.path), "rb") as f:
chunk = f.read(100000) # Read ~1kb
hash_md5.update(chunk)
return hash_md5.hexdigest()
def get_hash(self) -> str:
""" A md5 sum of this file """
hash_md5 = hashlib.md5()
with open(str(self.path), "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def __get_meta_data_json_file(self) -> Optional[Path]:
""" Tries to identify the files associated .json file """
# Typical
attempt1 = Path(str(self.path) + '.json')
if attempt1.exists():
return attempt1
# Seems that jpg -> jpeg sometimes
attempt2 = Path(str(self.path).replace('jpg', 'jpeg') + '.json')
if attempt2.exists():
return attempt2
# This one was odd
attempt3 = Path(str(self.path).replace('_o.jpg', '_.json'))
if str(self.path).endswith('_o.jpg') and attempt3.exists():
return attempt3
# Photos edited in Google Photos (shouldn't match anymore after `pre_clean_directory`)
attempt4 = Path(str(self.path).replace('-edited', '') + '.json')
if '-edited.' in str(self.path) and attempt4.exists():
return attempt4
# Duplicated file names when uploading (still has some issues)
index_search = re.search(r"\((\d+)\)\.", self.path.name)
if index_search is not None:
index = index_search[1]
attempt5 = self.path.parent / (self.path.name.replace(f'({index})', '') + f'({index}).json')
if attempt5.exists():
return attempt5
# Looks like if the file name is too long, the json filename gets truncated (*facepalm*)
attempt6 = self.path.parent / (str(self.path.name[0:46]) + '.json')
if attempt6.exists():
return attempt6
# TODO Alternatively, load all of the .json files and use "title" to match file
raise Exception(f'Could not find metadata file for {self.path}')
def __get_meta_data_from_json(self) -> Dict:
""" Finds the json file containing metadata for this file stored by Google that exists in the takeout """
if self.metadata is not None:
return self.metadata # Cached
file = self.__get_meta_data_json_file()
if file is None:
return {}
with open(str(file)) as f:
metadata = json.load(f)
self.metadata = metadata # Cache for later
return metadata
def set_file_meta_data_from_json(self, exif_tool: ExifToolWrapper):
""" Sets metadata for the current file using the data found in the associated json file """
# Get metadata
metadata = self.__get_meta_data_from_json()
date_taken_formatted = self.get_original_date_created().strftime("%Y:%m:%d %H:%M:%S")
latitude = metadata['geoData']['latitude']
longitude = metadata['geoData']['longitude']
altitude = metadata['geoData']['altitude']
latitude_ref = 'N' if latitude >= 0 else 'S'
longitude_ref = 'E' if longitude >= 0 else 'W'
altitude_ref = '0' if longitude >= 0 else '1' # 0 = Above Sea Level
# Identify arguments to pass
extension = self.path.suffix.lower()[1:]
exiftool_args = [
f'-FileCreateDate="{date_taken_formatted}"',
f'-FileModifyDate="{date_taken_formatted}"',
]
if extension in ['jpg', 'jpeg', 'heic']:
exiftool_args.append(f'-exif:DateTimeOriginal="{date_taken_formatted}"')
if latitude != 0:
exiftool_args.append(f'-exif:gpslatitude={latitude}')
exiftool_args.append(f'-exif:gpslatituderef={latitude_ref}')
if longitude != 0:
exiftool_args.append(f'-exif:gpslongitude={longitude}')
exiftool_args.append(f'-exif:gpslongituderef={longitude_ref}')
if altitude != 0:
exiftool_args.append(f'-exif:GPSAltitude={altitude}')
exiftool_args.append(f'-exif:GPSAltitudeRef={altitude_ref}')
elif extension == 'png':
exiftool_args.append(f'-png:CreationTime="{date_taken_formatted}"')
elif extension == 'gif':
pass # GIF doesn't take what we have
elif extension in SUPPORTED_VIDEO_FORMATS:
exiftool_args.append(f'-quicktime:CreateDate="{date_taken_formatted}"')
exiftool_args.append(f'-quicktime:ModifyDate="{date_taken_formatted}"')
exiftool_args.append(f'-quicktime:TrackCreateDate="{date_taken_formatted}"')
exiftool_args.append(f'-quicktime:TrackModifyDate="{date_taken_formatted}"')
exiftool_args.append(f'-quicktime:MediaCreateDate="{date_taken_formatted}"')
exiftool_args.append(f'-quicktime:MediaModifyDate="{date_taken_formatted}"')
else:
raise Exception(f"File type not supported: .{extension}")
# Execute the write operation
logger.debug(f"Setting metadata for {self.path} using {exiftool_args}")
complete_args = [
*exiftool_args,
'-overwrite_original', # Don't create copies of the originals
'-m', # Ignore minor errors
str(self.path)
]
exif_tool.execute(*complete_args)
def get_original_date_created(self) -> datetime.datetime:
""" Identify the original date created of the file using the metadata from the associated json file """
metadata = self.__get_meta_data_from_json()
assert int(metadata['photoTakenTime']['timestamp']) != 0
date_taken = datetime.datetime.fromtimestamp(int(metadata['photoTakenTime']['timestamp']))
return date_taken
def pre_clean_directory(root: Path):
"""
Clean up the directory before processing
- Any filenames ending in "-edited.{ext}" should have the original removed and this renamed into the originals place
- Any .heic files should have their associated .mp4 files removed
"""
# Find all files with "-edited.{ext}" in the name, for each file, see if an original exists to be replaced
edited_files = list(root.glob('**/*-edited.*'))
for edited_file in tqdm(edited_files, desc='Cleaning up edited files', unit='files'):
original_file = edited_file.with_name(edited_file.stem.replace('-edited', '')).with_suffix(edited_file.suffix)
if original_file.exists():
logger.debug(f"Found edited file, {edited_file}, and original file, {original_file}. Deleting original and renaming edited file to original.")
original_file.unlink()
edited_file.rename(original_file)
# Find all .heic files and remove their associated .mp4 files
heic_files = list(root.glob('**/*.heic'))
for heic_file in tqdm(heic_files, desc='Cleaning up HEIC-related MP4 files', unit='files'):
mp4_file = heic_file.with_suffix('.mp4')
if mp4_file.exists():
logger.debug(f"Found MP4 file related to an HEIC file: {mp4_file}. Deleting.")
mp4_file.unlink()
def identify_all_supported_files(root: Path, delete_exiftool_tmp_file_when_found: bool) -> List[MediaFile]:
""" Get a list of all supported files and optionally remove any temporary exiftool files while doing this """
files_discovered: List[MediaFile] = []
for path_object in tqdm(root.glob('**/*'), desc='Identifying files', unit='objects'):
# Add any supported files to be processed
if path_object.is_file() and path_object.suffix[1:].lower() in (SUPPORTED_IMAGE_FORMATS + SUPPORTED_VIDEO_FORMATS):
files_discovered.append(MediaFile(path_object))
# Remove any temporary exiftool files from previous early exits
if delete_exiftool_tmp_file_when_found and path_object.is_file() and path_object.name.endswith('_exiftool_tmp'):
os.remove(str(path_object))
return files_discovered
def create_low_cost_hash_buckets(files: List[MediaFile]) -> Dict[str, List[MediaFile]]:
""" Create dict[hash] = MediaFile[] buckets using a hashing method that doesn't ready the whole file """
hash_buckets: Dict[str, List[MediaFile]] = defaultdict(list)
for file in tqdm(files, desc='Getting low-cost hashes', unit='files'):
hash_buckets[file.get_low_cost_hash()].append(file)
return hash_buckets
def duplicate_files_using_low_cost_hash_buckets(buckets: Dict[str, List[MediaFile]]) -> List[List[MediaFile]]:
""" Using the low cost hash buckets, identify potentially duplicated files then fully hash them to identify duplicated files """
high_cost_hash_buckets: Dict[str, List[MediaFile]] = defaultdict(list)
for low_cost_hash in tqdm(buckets, desc='Identifying duplicate files', unit='hash-buckets'):
if len(buckets[low_cost_hash]) > 1:
for media_file in buckets[low_cost_hash]:
high_cost_hash_buckets[media_file.get_hash()].append(media_file)
return list(high_cost_hash_buckets.values())
def filter_duplicates(duplicate_file_groups: List[List[MediaFile]], files_to_process: List[MediaFile]) -> List[MediaFile]:
""" Remove duplicate files from the files to process by keeping the first instance of the duplicates """
files_to_filter_out: List[str] = []
for file_group in duplicate_file_groups:
for file in file_group[1:]:
logger.debug(f"Found duplicate file: {file}. Skipping.")
files_to_filter_out.append(str(file.path.absolute()))
return list(filter(
lambda mf: str(mf.path.absolute()) not in files_to_filter_out,
files_to_process
))
def update_meta_data(files: List[MediaFile], exif_tool: ExifToolWrapper):
""" Update the metadata for each media file provided """
for file in tqdm(files, desc='Updating file metadata', unit='files'):
file.set_file_meta_data_from_json(exif_tool)
def move_files(files: List[MediaFile], output_dir: Path):
""" Move each file provided to the output folder and rename to `YYYY-MM-DD_HH-MM-SS {original_filename}` """
for file in tqdm(files, desc='Moving files', unit='files'):
assert 'photoTakenTime' in file.metadata and 'timestamp' in file.metadata['photoTakenTime'] # Validates we have a timestamp on everything
new_filename = f'{file.get_original_date_created().strftime("%Y-%m-%d_%H-%M-%S")} {file.path.name}'
new_pathname = output_dir / new_filename
os.rename(str(file.path), str(new_pathname))
def report_remaining_files(root: Path):
""" Report the files that were either skipped due to duplicates or non-supported files that are still in the source directory """
file_counts: Dict[str, int] = defaultdict(lambda: 0)
for path_object in tqdm(root.glob('**/*'), desc='Identifying remaining files', unit='objects'):
if path_object.is_file():
file_counts[path_object.suffix.lower()] += 1
sorted_file_counts = dict(sorted(file_counts.items(), key=lambda item: item[1], reverse=True))
logger.info('\nRemaining files (including duplicates ignored):')
logger.info(f'| {"Extension":^11} | {"Count":^7} |')
logger.info(f'| {"-"*11} | {"-"*7} |')
for extension in sorted_file_counts:
logger.info(f'| {extension:>11} | {file_counts[extension]:7} |')
@logger.catch(onerror=lambda e: sys.exit(1))
def main(input_directory: Path, output_directory: Path, delete_exiftool_tmp_files: bool):
assert input_directory.exists()
output_directory.mkdir(parents=True, exist_ok=True)
# Step 1. Clear some files we won't want to process
logger.info("Pre-cleaning input directory")
pre_clean_directory(input_directory)
# Step 2. Identify all images to process
logger.info("Identifying all supported files")
media_files: List[MediaFile] = identify_all_supported_files(input_directory, delete_exiftool_tmp_files)
# Step 3. Filter duplicates
logger.info("Filtering duplicates")
low_cost_hash_buckets = create_low_cost_hash_buckets(media_files) # Get any potential matches (using a low cost method)
duplicate_files = duplicate_files_using_low_cost_hash_buckets(low_cost_hash_buckets) # Then identify duplicate files
media_files_filtered = filter_duplicates(duplicate_files, media_files) # Deal with duplicates
# Step 4. Update metadata using .json files
logger.info("Updating metadata")
with ExifToolWrapper() as exif_tool_wrapper:
update_meta_data(media_files_filtered, exif_tool_wrapper)
# Step 5. Move images into output
logger.info("Moving files to output directory")
move_files(media_files_filtered, output_directory)
# Step 6. Report all files left over
logger.info("Creating report of remaining files")
report_remaining_files(input_directory)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog='Google Photos Takeout Processor',
usage='python google_photos_takeout_processor.py -i [INPUT TAKEOUT FOLDER] -o [OUTPUT FOLDER]',
description=
"""This script takes all of your photos and videos form Google Photos takeout,
fixes their exif data and file creation date using the data kept by Google,
and then copies all file to one folder.
"""
)
parser.add_argument(
'-i', '--input-folder',
type=str,
required=True,
help='Input folder with contents of Google Photos takeout (unzipped)'
)
parser.add_argument(
'-o', '--output-folder',
type=str,
required=False,
default='./output',
help='Output folders which in all photos will be placed in'
)
parser.add_argument(
'--dont-delete-exiftool-tmp-files',
action='store_true',
help="Don't delete *_exiftool_tmp files that exists from previous early exits"
)
arguments = parser.parse_args()
input_directory = Path(arguments.input_folder).absolute()
output_directory = Path(arguments.output_folder).absolute()
delete_exiftool_tmp_files = not arguments.dont_delete_exiftool_tmp_files
main(input_directory, output_directory, delete_exiftool_tmp_files)
"""
Renames media files to be in the format YYYY-MM-DD_HH-MM-SS.{ext}
This will move files! Make sure you have a backup of your files before running this script.
Usage:
python media_renaming.py --input-folder [input-folder] --output-folder [output-folder]
"""
import argparse
import datetime
import os
import subprocess
from collections import defaultdict
import hashlib
from pathlib import Path
from typing import List, Dict, Optional
from tqdm import tqdm
EXIFTOOL_BINARY_LOCATION = r"C:\Users\Brent\Programs\exiftool.exe"
SUPPORTED_IMAGE_FORMATS = ['png', 'jpg', 'jpeg', 'gif']
SUPPORTED_VIDEO_FORMATS = ['mp4', 'mov']
class ExifToolWrapper(object):
""" src: https://stackoverflow.com/a/10075210 """
sentinel = b"{ready}\r\n"
def __enter__(self):
self.process = subprocess.Popen(
[EXIFTOOL_BINARY_LOCATION, "-stay_open", "True", "-@", "-"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE # A lot of warnings are written to stderr by exiftool - remove this line to debug any exiftool issues
)
return self
def __exit__(self, exc_type, exc_value, traceback):
self.process.stdin.write(b"-stay_open\nFalse\n")
self.process.stdin.flush()
print('Closed exiftool')
def execute(self, *args):
args = args + ("-execute\n",)
self.process.stdin.write(str.join("\n", args).encode())
self.process.stdin.flush()
output = b""
fd = self.process.stdout.fileno()
while not output.endswith(self.sentinel):
output += os.read(fd, 4096)
return output[:-len(self.sentinel)]
class MediaFile:
""" Class for keeping track of an individual media file """
path: Path
date_created: Optional[datetime.datetime] = None
def __init__(self, path: Path):
self.path = path
def get_low_cost_hash(self) -> str:
""" A md5 sum of part of this file. Intended to be used to check for **potential** matches. """
hash_md5 = hashlib.md5()
with open(str(self.path), "rb") as f:
chunk = f.read(10000) # Read ~10kb
hash_md5.update(chunk)
return hash_md5.hexdigest()
def get_hash(self) -> str:
""" A md5 sum of this file """
hash_md5 = hashlib.md5()
with open(str(self.path), "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def get_original_date_created(self, exif_tool: ExifToolWrapper):
""" Identify the original date created of the file using the timestamp on the file """
exiftool_args = ['-s', '-s', '-s']
extension = self.path.suffix.lower()[1:]
if extension in ['jpg', 'jpeg']:
exiftool_args.append('-exif:DateTimeOriginal')
exiftool_args.append('-XMP:DateCreated')
elif extension == 'png':
exiftool_args.append('-png:CreationTime')
exiftool_args.append('-exif:DateTimeOriginal')
elif extension in SUPPORTED_VIDEO_FORMATS:
exiftool_args.append('-quicktime:CreateDate')
else:
raise Exception("Unsupported file")
# Execute the read operation
complete_args = [
*exiftool_args,
'-m', # Ignore minor errors
str(self.path)
]
value = exif_tool.execute(*complete_args)
values = value.decode().split('\r\n')
for clean_value in values:
if clean_value == '':
continue
try:
date = datetime.datetime.strptime(clean_value, '%Y:%m:%d %H:%M:%S')
break
except ValueError:
date = datetime.datetime.strptime(clean_value, '%Y:%m:%d %H:%M:%S+12:00')
break
else:
raise Exception(f'No date found on {self.path}')
# Update the create and modify date at the same time
date_taken_formatted = date.strftime("%Y:%m:%d %H:%M:%S")
complete_args = [
f'-FileCreateDate="{date_taken_formatted}"',
f'-FileModifyDate="{date_taken_formatted}"',
'-overwrite_original', # Don't create copies of the originals
'-m', # Ignore minor errors
str(self.path)
]
exif_tool.execute(*complete_args)
self.date_created = date
def identify_all_supported_files(root: Path) -> List[MediaFile]:
""" Get a list of all supported files """
files_discovered: List[MediaFile] = []
for path_object in tqdm(root.glob('**/*'), desc='Identifying files', unit='objects'):
# Add any supported files to be processed
if path_object.is_file() and path_object.suffix[1:].lower() in (SUPPORTED_IMAGE_FORMATS + SUPPORTED_VIDEO_FORMATS):
files_discovered.append(MediaFile(path_object))
return files_discovered
def create_low_cost_hash_buckets(files: List[MediaFile]) -> Dict[str, List[MediaFile]]:
""" Create dict[hash] = MediaFile[] buckets using a hashing method that doesn't ready the whole file """
hash_buckets: Dict[str, List[MediaFile]] = defaultdict(list)
for file in tqdm(files, desc='Getting low-cost hashes', unit='files'):
hash_buckets[file.get_low_cost_hash()].append(file)
return hash_buckets
def duplicate_files_using_low_cost_hash_buckets(buckets: Dict[str, List[MediaFile]]) -> List[List[MediaFile]]:
""" Using the low cost hash buckets, identify potentially duplicated files then fully hash them to identify duplicated files """
high_cost_hash_buckets: Dict[str, List[MediaFile]] = defaultdict(list)
for low_cost_hash in tqdm(buckets, desc='Identifying duplicate files', unit='hash-buckets'):
if len(buckets[low_cost_hash]) > 1:
for media_file in buckets[low_cost_hash]:
high_cost_hash_buckets[media_file.get_hash()].append(media_file)
return list(high_cost_hash_buckets.values())
def filter_duplicates(duplicate_file_groups: List[List[MediaFile]], files_to_process: List[MediaFile]) -> List[MediaFile]:
""" Remove duplicate files from the files to process by keeping the first instance of the duplicates """
files_to_filter_out: List[str] = []
for file_group in duplicate_file_groups:
for file in file_group[1:]:
files_to_filter_out.append(str(file.path.absolute()))
print(f'Duplicates to skip: {len(files_to_filter_out)}')
return list(filter(
lambda mf: str(mf.path.absolute()) not in files_to_filter_out,
files_to_process
))
def get_original_dates_created(files: List[MediaFile], exif_tool: ExifToolWrapper):
""" Get the original date created for each file and make sure it exists """
for file in tqdm(files, desc='Getting original dates', unit='files'):
file.get_original_date_created(exif_tool)
def move_files(files: List[MediaFile], output_dir: Path):
""" Move each file provided to the output folder and rename to `YYYY-MM-DD_HH-MM-SS {original_filename}` """
for file in tqdm(files, desc='Moving files', unit='files'):
assert file.date_created is not None
filename_prefix = file.date_created.strftime("%Y-%m-%d_%H-%M-%S")
if file.path.name.startswith(filename_prefix):
new_filename = file.path.name
else:
new_filename = f'{filename_prefix} {file.path.name}'
new_pathname = output_dir / new_filename
os.rename(str(file.path), str(new_pathname))
def report_remaining_files(root: Path):
""" Report the files that were either skipped due to duplicates or non-supported files that are still in the source directory """
file_counts: Dict[str, int] = defaultdict(lambda: 0)
for path_object in tqdm(root.glob('**/*'), desc='Identifying remaining files', unit='objects'):
if path_object.is_file():
file_counts[path_object.suffix.lower()] += 1
sorted_file_counts = dict(sorted(file_counts.items(), key=lambda item: item[1], reverse=True))
print('\nRemaining files (including duplicates ignored):')
print(f'| {"Extension":^11} | {"Count":^7} |')
print(f'| {"-"*11} | {"-"*7} |')
for extension in sorted_file_counts:
print(f'| {extension:>11} | {file_counts[extension]:7} |')
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog='Google Photos Takeout Processor',
usage='python google_photos_takeout_processor.py -i [INPUT TAKEOUT FOLDER] -o [OUTPUT FOLDER]',
description=
"""This script takes all of your photos and videos form Google Photos takeout,
fixes their exif data and file creation date using the data kept by Google,
and then copies all file to one folder.
"""
)
parser.add_argument(
'-i', '--input-folder',
type=str,
required=True,
help='Input folder with contents of Google Photos takeout (unzipped)'
)
parser.add_argument(
'-o', '--output-folder',
type=str,
required=False,
default='./output',
help='Output folders which in all photos will be placed in'
)
arguments = parser.parse_args()
input_directory = Path(arguments.input_folder).absolute()
output_directory = Path(arguments.output_folder).absolute()
assert input_directory.exists()
output_directory.mkdir(parents=True, exist_ok=True)
# Step 1. Identify all images to process
media_files: List[MediaFile] = identify_all_supported_files(input_directory)
# Step 2. Filter duplicates
low_cost_hash_buckets = create_low_cost_hash_buckets(media_files) # Get any potential matches (using a low cost method)
duplicate_files = duplicate_files_using_low_cost_hash_buckets(low_cost_hash_buckets) # Then identify duplicate files
media_files_filtered = filter_duplicates(duplicate_files, media_files) # Deal with duplicates
# Step 3. Get dates created from files
with ExifToolWrapper() as exif_tool_wrapper:
get_original_dates_created(media_files_filtered, exif_tool_wrapper)
# Step 4. Move images into output
move_files(media_files_filtered, output_directory)
# Step 5. Report all files left over
report_remaining_files(input_directory)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment