Last active
January 29, 2023 01:04
-
-
Save brentvollebregt/e865e0a3a76bd400c504378ad91bd64c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Google Photos Takeout Processor | |
This tool takes an unzipped and merged (for takeouts above 10Gb) Google Photos Takeout and filters duplicates, applies | |
metadata and moves all files to an output folder with each file having the format | |
`YYYY-MM-DD_HH-MM-SS {original_filename}`. | |
Setup: | |
1. Install tqdm: `pip install tqdm` | |
2. Install loguru: `pip install loguru` | |
3. Download exiftool from https://exiftool.org/ and specify the binary location in EXIFTOOL_BINARY_LOCATION below. | |
- You can also just use `exiftool` as the value if it is on the PATH and can be found by cmd. | |
Typical usage: python google_photos_takeout_metadata_applier.py -i {folder containing takeout(s)} -o {output folder} | |
The target of this was to do something like https://github.com/TheLastGimbus/GooglePhotosTakeoutHelper but do support video formats also. | |
""" | |
import argparse | |
import datetime | |
import json | |
import os | |
import re | |
import subprocess | |
from collections import defaultdict | |
import hashlib | |
from pathlib import Path | |
import sys | |
from typing import List, Dict, Optional | |
from tqdm import tqdm # pip install tqdm | |
from loguru import logger # pip install loguru | |
EXIFTOOL_BINARY_LOCATION = r"C:\Users\Brent\Programs\exiftool.exe" # Docs: https://exiftool.org/exiftool_pod.html | |
SUPPORTED_IMAGE_FORMATS = ['png', 'jpg', 'jpeg', 'gif', 'heic'] | |
SUPPORTED_VIDEO_FORMATS = ['mp4', 'mov'] | |
# Setup logging | |
logger.remove() | |
logger.add(lambda msg: tqdm.write(msg, end=""), format="{message}", level="INFO") | |
logger.add("{time}.log", level="DEBUG", encoding="utf8") # Set level="TRACE" to get more info | |
class ExifToolWrapper(object): | |
""" src: https://stackoverflow.com/a/10075210/3774244 """ | |
sentinel = b"{ready}\r\n" | |
def __enter__(self): | |
self.process = subprocess.Popen( | |
[EXIFTOOL_BINARY_LOCATION, "-stay_open", "True", "-@", "-"], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT # A lot of warnings are written to stderr by exiftool - this pipes them to stdout so they can be logged and exiftool doesn't block waiting for us to read the stream (as it just goes into what we are already reading) | |
) | |
logger.debug('Opened exiftool') | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.process.stdin.write(b"-stay_open\nFalse\n") | |
self.process.stdin.flush() | |
logger.debug('Closed exiftool') | |
def execute(self, *args): | |
args = args + ("-execute\n",) | |
self.process.stdin.write(str.join("\n", args).encode()) | |
self.process.stdin.flush() | |
logger.trace(f'Wrote to exiftool: {args}') | |
output = b"" | |
fd = self.process.stdout.fileno() | |
while not output.endswith(self.sentinel): | |
output += os.read(fd, 4096) | |
logger.trace(f'Read from exiftool: {output}') | |
# Check for errors | |
if b"0 image files updated" in output: | |
logger.error(f"Error updating file with exiftool: {output}") | |
return output[:-len(self.sentinel)] | |
class MediaFile: | |
""" Class for keeping track of an individual media file """ | |
path: Path | |
metadata: Optional[Dict] = None | |
def __init__(self, path: Path): | |
self.path = path | |
def get_low_cost_hash(self) -> str: | |
""" A md5 sum of part of this file. Intended to be used to check for **potential** matches. """ | |
hash_md5 = hashlib.md5() | |
with open(str(self.path), "rb") as f: | |
chunk = f.read(100000) # Read ~1kb | |
hash_md5.update(chunk) | |
return hash_md5.hexdigest() | |
def get_hash(self) -> str: | |
""" A md5 sum of this file """ | |
hash_md5 = hashlib.md5() | |
with open(str(self.path), "rb") as f: | |
for chunk in iter(lambda: f.read(4096), b""): | |
hash_md5.update(chunk) | |
return hash_md5.hexdigest() | |
def __get_meta_data_json_file(self) -> Optional[Path]: | |
""" Tries to identify the files associated .json file """ | |
# Typical | |
attempt1 = Path(str(self.path) + '.json') | |
if attempt1.exists(): | |
return attempt1 | |
# Seems that jpg -> jpeg sometimes | |
attempt2 = Path(str(self.path).replace('jpg', 'jpeg') + '.json') | |
if attempt2.exists(): | |
return attempt2 | |
# This one was odd | |
attempt3 = Path(str(self.path).replace('_o.jpg', '_.json')) | |
if str(self.path).endswith('_o.jpg') and attempt3.exists(): | |
return attempt3 | |
# Photos edited in Google Photos (shouldn't match anymore after `pre_clean_directory`) | |
attempt4 = Path(str(self.path).replace('-edited', '') + '.json') | |
if '-edited.' in str(self.path) and attempt4.exists(): | |
return attempt4 | |
# Duplicated file names when uploading (still has some issues) | |
index_search = re.search(r"\((\d+)\)\.", self.path.name) | |
if index_search is not None: | |
index = index_search[1] | |
attempt5 = self.path.parent / (self.path.name.replace(f'({index})', '') + f'({index}).json') | |
if attempt5.exists(): | |
return attempt5 | |
# Looks like if the file name is too long, the json filename gets truncated (*facepalm*) | |
attempt6 = self.path.parent / (str(self.path.name[0:46]) + '.json') | |
if attempt6.exists(): | |
return attempt6 | |
# TODO Alternatively, load all of the .json files and use "title" to match file | |
raise Exception(f'Could not find metadata file for {self.path}') | |
def __get_meta_data_from_json(self) -> Dict: | |
""" Finds the json file containing metadata for this file stored by Google that exists in the takeout """ | |
if self.metadata is not None: | |
return self.metadata # Cached | |
file = self.__get_meta_data_json_file() | |
if file is None: | |
return {} | |
with open(str(file)) as f: | |
metadata = json.load(f) | |
self.metadata = metadata # Cache for later | |
return metadata | |
def set_file_meta_data_from_json(self, exif_tool: ExifToolWrapper): | |
""" Sets metadata for the current file using the data found in the associated json file """ | |
# Get metadata | |
metadata = self.__get_meta_data_from_json() | |
date_taken_formatted = self.get_original_date_created().strftime("%Y:%m:%d %H:%M:%S") | |
latitude = metadata['geoData']['latitude'] | |
longitude = metadata['geoData']['longitude'] | |
altitude = metadata['geoData']['altitude'] | |
latitude_ref = 'N' if latitude >= 0 else 'S' | |
longitude_ref = 'E' if longitude >= 0 else 'W' | |
altitude_ref = '0' if longitude >= 0 else '1' # 0 = Above Sea Level | |
# Identify arguments to pass | |
extension = self.path.suffix.lower()[1:] | |
exiftool_args = [ | |
f'-FileCreateDate="{date_taken_formatted}"', | |
f'-FileModifyDate="{date_taken_formatted}"', | |
] | |
if extension in ['jpg', 'jpeg', 'heic']: | |
exiftool_args.append(f'-exif:DateTimeOriginal="{date_taken_formatted}"') | |
if latitude != 0: | |
exiftool_args.append(f'-exif:gpslatitude={latitude}') | |
exiftool_args.append(f'-exif:gpslatituderef={latitude_ref}') | |
if longitude != 0: | |
exiftool_args.append(f'-exif:gpslongitude={longitude}') | |
exiftool_args.append(f'-exif:gpslongituderef={longitude_ref}') | |
if altitude != 0: | |
exiftool_args.append(f'-exif:GPSAltitude={altitude}') | |
exiftool_args.append(f'-exif:GPSAltitudeRef={altitude_ref}') | |
elif extension == 'png': | |
exiftool_args.append(f'-png:CreationTime="{date_taken_formatted}"') | |
elif extension == 'gif': | |
pass # GIF doesn't take what we have | |
elif extension in SUPPORTED_VIDEO_FORMATS: | |
exiftool_args.append(f'-quicktime:CreateDate="{date_taken_formatted}"') | |
exiftool_args.append(f'-quicktime:ModifyDate="{date_taken_formatted}"') | |
exiftool_args.append(f'-quicktime:TrackCreateDate="{date_taken_formatted}"') | |
exiftool_args.append(f'-quicktime:TrackModifyDate="{date_taken_formatted}"') | |
exiftool_args.append(f'-quicktime:MediaCreateDate="{date_taken_formatted}"') | |
exiftool_args.append(f'-quicktime:MediaModifyDate="{date_taken_formatted}"') | |
else: | |
raise Exception(f"File type not supported: .{extension}") | |
# Execute the write operation | |
logger.debug(f"Setting metadata for {self.path} using {exiftool_args}") | |
complete_args = [ | |
*exiftool_args, | |
'-overwrite_original', # Don't create copies of the originals | |
'-m', # Ignore minor errors | |
str(self.path) | |
] | |
exif_tool.execute(*complete_args) | |
def get_original_date_created(self) -> datetime.datetime: | |
""" Identify the original date created of the file using the metadata from the associated json file """ | |
metadata = self.__get_meta_data_from_json() | |
assert int(metadata['photoTakenTime']['timestamp']) != 0 | |
date_taken = datetime.datetime.fromtimestamp(int(metadata['photoTakenTime']['timestamp'])) | |
return date_taken | |
def pre_clean_directory(root: Path): | |
""" | |
Clean up the directory before processing | |
- Any filenames ending in "-edited.{ext}" should have the original removed and this renamed into the originals place | |
- Any .heic files should have their associated .mp4 files removed | |
""" | |
# Find all files with "-edited.{ext}" in the name, for each file, see if an original exists to be replaced | |
edited_files = list(root.glob('**/*-edited.*')) | |
for edited_file in tqdm(edited_files, desc='Cleaning up edited files', unit='files'): | |
original_file = edited_file.with_name(edited_file.stem.replace('-edited', '')).with_suffix(edited_file.suffix) | |
if original_file.exists(): | |
logger.debug(f"Found edited file, {edited_file}, and original file, {original_file}. Deleting original and renaming edited file to original.") | |
original_file.unlink() | |
edited_file.rename(original_file) | |
# Find all .heic files and remove their associated .mp4 files | |
heic_files = list(root.glob('**/*.heic')) | |
for heic_file in tqdm(heic_files, desc='Cleaning up HEIC-related MP4 files', unit='files'): | |
mp4_file = heic_file.with_suffix('.mp4') | |
if mp4_file.exists(): | |
logger.debug(f"Found MP4 file related to an HEIC file: {mp4_file}. Deleting.") | |
mp4_file.unlink() | |
def identify_all_supported_files(root: Path, delete_exiftool_tmp_file_when_found: bool) -> List[MediaFile]: | |
""" Get a list of all supported files and optionally remove any temporary exiftool files while doing this """ | |
files_discovered: List[MediaFile] = [] | |
for path_object in tqdm(root.glob('**/*'), desc='Identifying files', unit='objects'): | |
# Add any supported files to be processed | |
if path_object.is_file() and path_object.suffix[1:].lower() in (SUPPORTED_IMAGE_FORMATS + SUPPORTED_VIDEO_FORMATS): | |
files_discovered.append(MediaFile(path_object)) | |
# Remove any temporary exiftool files from previous early exits | |
if delete_exiftool_tmp_file_when_found and path_object.is_file() and path_object.name.endswith('_exiftool_tmp'): | |
os.remove(str(path_object)) | |
return files_discovered | |
def create_low_cost_hash_buckets(files: List[MediaFile]) -> Dict[str, List[MediaFile]]: | |
""" Create dict[hash] = MediaFile[] buckets using a hashing method that doesn't ready the whole file """ | |
hash_buckets: Dict[str, List[MediaFile]] = defaultdict(list) | |
for file in tqdm(files, desc='Getting low-cost hashes', unit='files'): | |
hash_buckets[file.get_low_cost_hash()].append(file) | |
return hash_buckets | |
def duplicate_files_using_low_cost_hash_buckets(buckets: Dict[str, List[MediaFile]]) -> List[List[MediaFile]]: | |
""" Using the low cost hash buckets, identify potentially duplicated files then fully hash them to identify duplicated files """ | |
high_cost_hash_buckets: Dict[str, List[MediaFile]] = defaultdict(list) | |
for low_cost_hash in tqdm(buckets, desc='Identifying duplicate files', unit='hash-buckets'): | |
if len(buckets[low_cost_hash]) > 1: | |
for media_file in buckets[low_cost_hash]: | |
high_cost_hash_buckets[media_file.get_hash()].append(media_file) | |
return list(high_cost_hash_buckets.values()) | |
def filter_duplicates(duplicate_file_groups: List[List[MediaFile]], files_to_process: List[MediaFile]) -> List[MediaFile]: | |
""" Remove duplicate files from the files to process by keeping the first instance of the duplicates """ | |
files_to_filter_out: List[str] = [] | |
for file_group in duplicate_file_groups: | |
for file in file_group[1:]: | |
logger.debug(f"Found duplicate file: {file}. Skipping.") | |
files_to_filter_out.append(str(file.path.absolute())) | |
return list(filter( | |
lambda mf: str(mf.path.absolute()) not in files_to_filter_out, | |
files_to_process | |
)) | |
def update_meta_data(files: List[MediaFile], exif_tool: ExifToolWrapper): | |
""" Update the metadata for each media file provided """ | |
for file in tqdm(files, desc='Updating file metadata', unit='files'): | |
file.set_file_meta_data_from_json(exif_tool) | |
def move_files(files: List[MediaFile], output_dir: Path): | |
""" Move each file provided to the output folder and rename to `YYYY-MM-DD_HH-MM-SS {original_filename}` """ | |
for file in tqdm(files, desc='Moving files', unit='files'): | |
assert 'photoTakenTime' in file.metadata and 'timestamp' in file.metadata['photoTakenTime'] # Validates we have a timestamp on everything | |
new_filename = f'{file.get_original_date_created().strftime("%Y-%m-%d_%H-%M-%S")} {file.path.name}' | |
new_pathname = output_dir / new_filename | |
os.rename(str(file.path), str(new_pathname)) | |
def report_remaining_files(root: Path): | |
""" Report the files that were either skipped due to duplicates or non-supported files that are still in the source directory """ | |
file_counts: Dict[str, int] = defaultdict(lambda: 0) | |
for path_object in tqdm(root.glob('**/*'), desc='Identifying remaining files', unit='objects'): | |
if path_object.is_file(): | |
file_counts[path_object.suffix.lower()] += 1 | |
sorted_file_counts = dict(sorted(file_counts.items(), key=lambda item: item[1], reverse=True)) | |
logger.info('\nRemaining files (including duplicates ignored):') | |
logger.info(f'| {"Extension":^11} | {"Count":^7} |') | |
logger.info(f'| {"-"*11} | {"-"*7} |') | |
for extension in sorted_file_counts: | |
logger.info(f'| {extension:>11} | {file_counts[extension]:7} |') | |
@logger.catch(onerror=lambda e: sys.exit(1)) | |
def main(input_directory: Path, output_directory: Path, delete_exiftool_tmp_files: bool): | |
assert input_directory.exists() | |
output_directory.mkdir(parents=True, exist_ok=True) | |
# Step 1. Clear some files we won't want to process | |
logger.info("Pre-cleaning input directory") | |
pre_clean_directory(input_directory) | |
# Step 2. Identify all images to process | |
logger.info("Identifying all supported files") | |
media_files: List[MediaFile] = identify_all_supported_files(input_directory, delete_exiftool_tmp_files) | |
# Step 3. Filter duplicates | |
logger.info("Filtering duplicates") | |
low_cost_hash_buckets = create_low_cost_hash_buckets(media_files) # Get any potential matches (using a low cost method) | |
duplicate_files = duplicate_files_using_low_cost_hash_buckets(low_cost_hash_buckets) # Then identify duplicate files | |
media_files_filtered = filter_duplicates(duplicate_files, media_files) # Deal with duplicates | |
# Step 4. Update metadata using .json files | |
logger.info("Updating metadata") | |
with ExifToolWrapper() as exif_tool_wrapper: | |
update_meta_data(media_files_filtered, exif_tool_wrapper) | |
# Step 5. Move images into output | |
logger.info("Moving files to output directory") | |
move_files(media_files_filtered, output_directory) | |
# Step 6. Report all files left over | |
logger.info("Creating report of remaining files") | |
report_remaining_files(input_directory) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
prog='Google Photos Takeout Processor', | |
usage='python google_photos_takeout_processor.py -i [INPUT TAKEOUT FOLDER] -o [OUTPUT FOLDER]', | |
description= | |
"""This script takes all of your photos and videos form Google Photos takeout, | |
fixes their exif data and file creation date using the data kept by Google, | |
and then copies all file to one folder. | |
""" | |
) | |
parser.add_argument( | |
'-i', '--input-folder', | |
type=str, | |
required=True, | |
help='Input folder with contents of Google Photos takeout (unzipped)' | |
) | |
parser.add_argument( | |
'-o', '--output-folder', | |
type=str, | |
required=False, | |
default='./output', | |
help='Output folders which in all photos will be placed in' | |
) | |
parser.add_argument( | |
'--dont-delete-exiftool-tmp-files', | |
action='store_true', | |
help="Don't delete *_exiftool_tmp files that exists from previous early exits" | |
) | |
arguments = parser.parse_args() | |
input_directory = Path(arguments.input_folder).absolute() | |
output_directory = Path(arguments.output_folder).absolute() | |
delete_exiftool_tmp_files = not arguments.dont_delete_exiftool_tmp_files | |
main(input_directory, output_directory, delete_exiftool_tmp_files) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Renames media files to be in the format YYYY-MM-DD_HH-MM-SS.{ext} | |
This will move files! Make sure you have a backup of your files before running this script. | |
Usage: | |
python media_renaming.py --input-folder [input-folder] --output-folder [output-folder] | |
""" | |
import argparse | |
import datetime | |
import os | |
import subprocess | |
from collections import defaultdict | |
import hashlib | |
from pathlib import Path | |
from typing import List, Dict, Optional | |
from tqdm import tqdm | |
EXIFTOOL_BINARY_LOCATION = r"C:\Users\Brent\Programs\exiftool.exe" | |
SUPPORTED_IMAGE_FORMATS = ['png', 'jpg', 'jpeg', 'gif'] | |
SUPPORTED_VIDEO_FORMATS = ['mp4', 'mov'] | |
class ExifToolWrapper(object): | |
""" src: https://stackoverflow.com/a/10075210 """ | |
sentinel = b"{ready}\r\n" | |
def __enter__(self): | |
self.process = subprocess.Popen( | |
[EXIFTOOL_BINARY_LOCATION, "-stay_open", "True", "-@", "-"], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE # A lot of warnings are written to stderr by exiftool - remove this line to debug any exiftool issues | |
) | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.process.stdin.write(b"-stay_open\nFalse\n") | |
self.process.stdin.flush() | |
print('Closed exiftool') | |
def execute(self, *args): | |
args = args + ("-execute\n",) | |
self.process.stdin.write(str.join("\n", args).encode()) | |
self.process.stdin.flush() | |
output = b"" | |
fd = self.process.stdout.fileno() | |
while not output.endswith(self.sentinel): | |
output += os.read(fd, 4096) | |
return output[:-len(self.sentinel)] | |
class MediaFile: | |
""" Class for keeping track of an individual media file """ | |
path: Path | |
date_created: Optional[datetime.datetime] = None | |
def __init__(self, path: Path): | |
self.path = path | |
def get_low_cost_hash(self) -> str: | |
""" A md5 sum of part of this file. Intended to be used to check for **potential** matches. """ | |
hash_md5 = hashlib.md5() | |
with open(str(self.path), "rb") as f: | |
chunk = f.read(10000) # Read ~10kb | |
hash_md5.update(chunk) | |
return hash_md5.hexdigest() | |
def get_hash(self) -> str: | |
""" A md5 sum of this file """ | |
hash_md5 = hashlib.md5() | |
with open(str(self.path), "rb") as f: | |
for chunk in iter(lambda: f.read(4096), b""): | |
hash_md5.update(chunk) | |
return hash_md5.hexdigest() | |
def get_original_date_created(self, exif_tool: ExifToolWrapper): | |
""" Identify the original date created of the file using the timestamp on the file """ | |
exiftool_args = ['-s', '-s', '-s'] | |
extension = self.path.suffix.lower()[1:] | |
if extension in ['jpg', 'jpeg']: | |
exiftool_args.append('-exif:DateTimeOriginal') | |
exiftool_args.append('-XMP:DateCreated') | |
elif extension == 'png': | |
exiftool_args.append('-png:CreationTime') | |
exiftool_args.append('-exif:DateTimeOriginal') | |
elif extension in SUPPORTED_VIDEO_FORMATS: | |
exiftool_args.append('-quicktime:CreateDate') | |
else: | |
raise Exception("Unsupported file") | |
# Execute the read operation | |
complete_args = [ | |
*exiftool_args, | |
'-m', # Ignore minor errors | |
str(self.path) | |
] | |
value = exif_tool.execute(*complete_args) | |
values = value.decode().split('\r\n') | |
for clean_value in values: | |
if clean_value == '': | |
continue | |
try: | |
date = datetime.datetime.strptime(clean_value, '%Y:%m:%d %H:%M:%S') | |
break | |
except ValueError: | |
date = datetime.datetime.strptime(clean_value, '%Y:%m:%d %H:%M:%S+12:00') | |
break | |
else: | |
raise Exception(f'No date found on {self.path}') | |
# Update the create and modify date at the same time | |
date_taken_formatted = date.strftime("%Y:%m:%d %H:%M:%S") | |
complete_args = [ | |
f'-FileCreateDate="{date_taken_formatted}"', | |
f'-FileModifyDate="{date_taken_formatted}"', | |
'-overwrite_original', # Don't create copies of the originals | |
'-m', # Ignore minor errors | |
str(self.path) | |
] | |
exif_tool.execute(*complete_args) | |
self.date_created = date | |
def identify_all_supported_files(root: Path) -> List[MediaFile]: | |
""" Get a list of all supported files """ | |
files_discovered: List[MediaFile] = [] | |
for path_object in tqdm(root.glob('**/*'), desc='Identifying files', unit='objects'): | |
# Add any supported files to be processed | |
if path_object.is_file() and path_object.suffix[1:].lower() in (SUPPORTED_IMAGE_FORMATS + SUPPORTED_VIDEO_FORMATS): | |
files_discovered.append(MediaFile(path_object)) | |
return files_discovered | |
def create_low_cost_hash_buckets(files: List[MediaFile]) -> Dict[str, List[MediaFile]]: | |
""" Create dict[hash] = MediaFile[] buckets using a hashing method that doesn't ready the whole file """ | |
hash_buckets: Dict[str, List[MediaFile]] = defaultdict(list) | |
for file in tqdm(files, desc='Getting low-cost hashes', unit='files'): | |
hash_buckets[file.get_low_cost_hash()].append(file) | |
return hash_buckets | |
def duplicate_files_using_low_cost_hash_buckets(buckets: Dict[str, List[MediaFile]]) -> List[List[MediaFile]]: | |
""" Using the low cost hash buckets, identify potentially duplicated files then fully hash them to identify duplicated files """ | |
high_cost_hash_buckets: Dict[str, List[MediaFile]] = defaultdict(list) | |
for low_cost_hash in tqdm(buckets, desc='Identifying duplicate files', unit='hash-buckets'): | |
if len(buckets[low_cost_hash]) > 1: | |
for media_file in buckets[low_cost_hash]: | |
high_cost_hash_buckets[media_file.get_hash()].append(media_file) | |
return list(high_cost_hash_buckets.values()) | |
def filter_duplicates(duplicate_file_groups: List[List[MediaFile]], files_to_process: List[MediaFile]) -> List[MediaFile]: | |
""" Remove duplicate files from the files to process by keeping the first instance of the duplicates """ | |
files_to_filter_out: List[str] = [] | |
for file_group in duplicate_file_groups: | |
for file in file_group[1:]: | |
files_to_filter_out.append(str(file.path.absolute())) | |
print(f'Duplicates to skip: {len(files_to_filter_out)}') | |
return list(filter( | |
lambda mf: str(mf.path.absolute()) not in files_to_filter_out, | |
files_to_process | |
)) | |
def get_original_dates_created(files: List[MediaFile], exif_tool: ExifToolWrapper): | |
""" Get the original date created for each file and make sure it exists """ | |
for file in tqdm(files, desc='Getting original dates', unit='files'): | |
file.get_original_date_created(exif_tool) | |
def move_files(files: List[MediaFile], output_dir: Path): | |
""" Move each file provided to the output folder and rename to `YYYY-MM-DD_HH-MM-SS {original_filename}` """ | |
for file in tqdm(files, desc='Moving files', unit='files'): | |
assert file.date_created is not None | |
filename_prefix = file.date_created.strftime("%Y-%m-%d_%H-%M-%S") | |
if file.path.name.startswith(filename_prefix): | |
new_filename = file.path.name | |
else: | |
new_filename = f'{filename_prefix} {file.path.name}' | |
new_pathname = output_dir / new_filename | |
os.rename(str(file.path), str(new_pathname)) | |
def report_remaining_files(root: Path): | |
""" Report the files that were either skipped due to duplicates or non-supported files that are still in the source directory """ | |
file_counts: Dict[str, int] = defaultdict(lambda: 0) | |
for path_object in tqdm(root.glob('**/*'), desc='Identifying remaining files', unit='objects'): | |
if path_object.is_file(): | |
file_counts[path_object.suffix.lower()] += 1 | |
sorted_file_counts = dict(sorted(file_counts.items(), key=lambda item: item[1], reverse=True)) | |
print('\nRemaining files (including duplicates ignored):') | |
print(f'| {"Extension":^11} | {"Count":^7} |') | |
print(f'| {"-"*11} | {"-"*7} |') | |
for extension in sorted_file_counts: | |
print(f'| {extension:>11} | {file_counts[extension]:7} |') | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
prog='Google Photos Takeout Processor', | |
usage='python google_photos_takeout_processor.py -i [INPUT TAKEOUT FOLDER] -o [OUTPUT FOLDER]', | |
description= | |
"""This script takes all of your photos and videos form Google Photos takeout, | |
fixes their exif data and file creation date using the data kept by Google, | |
and then copies all file to one folder. | |
""" | |
) | |
parser.add_argument( | |
'-i', '--input-folder', | |
type=str, | |
required=True, | |
help='Input folder with contents of Google Photos takeout (unzipped)' | |
) | |
parser.add_argument( | |
'-o', '--output-folder', | |
type=str, | |
required=False, | |
default='./output', | |
help='Output folders which in all photos will be placed in' | |
) | |
arguments = parser.parse_args() | |
input_directory = Path(arguments.input_folder).absolute() | |
output_directory = Path(arguments.output_folder).absolute() | |
assert input_directory.exists() | |
output_directory.mkdir(parents=True, exist_ok=True) | |
# Step 1. Identify all images to process | |
media_files: List[MediaFile] = identify_all_supported_files(input_directory) | |
# Step 2. Filter duplicates | |
low_cost_hash_buckets = create_low_cost_hash_buckets(media_files) # Get any potential matches (using a low cost method) | |
duplicate_files = duplicate_files_using_low_cost_hash_buckets(low_cost_hash_buckets) # Then identify duplicate files | |
media_files_filtered = filter_duplicates(duplicate_files, media_files) # Deal with duplicates | |
# Step 3. Get dates created from files | |
with ExifToolWrapper() as exif_tool_wrapper: | |
get_original_dates_created(media_files_filtered, exif_tool_wrapper) | |
# Step 4. Move images into output | |
move_files(media_files_filtered, output_directory) | |
# Step 5. Report all files left over | |
report_remaining_files(input_directory) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment