Last active
July 25, 2024 11:04
-
-
Save salbahra/c9e95ce0ca1d2be83e6af63cc64538aa to your computer and use it in GitHub Desktop.
Processes Google Takeout data for Photos by pairing JSON and media files and updating media files with metadata from the JSON.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import json | |
from collections import defaultdict | |
import urllib.parse | |
from fractions import Fraction | |
from datetime import datetime | |
from PIL.ExifTags import GPSTAGS | |
import piexif | |
import subprocess | |
MEDIA_EXTENSIONS = ('.avi', '.mp4', '.mov', '.mkv', '.m4v', '.mpg', '.wmv', '.jpg', '.jpeg', '.png', '.heic', '.gif', '.3gp') | |
JSON_FILENAME_MAX_LENGTH = 50 | |
GPS_TOLERANCE = 1e-6 # Approximately 0.1 meters at the equator | |
class Counter: | |
def __init__(self): | |
self.skipped_time = 0 | |
self.skipped_exif = 0 | |
self.updated_time = 0 | |
self.updated_exif = 0 | |
counter = Counter() | |
def get_metadata(file_path): | |
try: | |
result = subprocess.run( | |
['exiftool', '-json', file_path], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
universal_newlines=True | |
) | |
metadata = json.loads(result.stdout) | |
return metadata[0] if metadata else {} | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return {} | |
def get_creation_date(metadata): | |
date_formats = ['%Y:%m:%d %H:%M:%S', '%Y/%m/%d %H:%M:%S'] | |
date_keys = [ | |
'CreateDate', 'CreationDate', 'DateTimeOriginal', 'MediaCreateDate', | |
'TrackCreateDate', 'ModifyDate', 'MediaModifyDate', 'TrackModifyDate' | |
] | |
for key in date_keys: | |
if key in metadata: | |
for date_format in date_formats: | |
try: | |
return int(datetime.strptime(metadata[key], date_format).timestamp()) | |
except ValueError: | |
continue | |
return None | |
def delete_ignored_files(ignored_files): | |
for file in ignored_files: | |
try: | |
os.remove(file) | |
print(f"Deleted ignored file: {file}") | |
except OSError as e: | |
print(f"Error deleting {file}: {e}") | |
def are_coordinates_equal(coord1, coord2, tolerance=GPS_TOLERANCE): | |
return abs(coord1 - coord2) <= tolerance | |
def get_exif_data(image_path): | |
try: | |
return piexif.load(image_path) | |
except Exception as e: | |
print(f"Error reading EXIF data from {image_path}: {e}") | |
return {} | |
def get_decimal_coordinates(info): | |
for key in ['GPSLatitude', 'GPSLongitude']: | |
if key in info: | |
components = info[key] | |
degrees = float(components[0][0] / components[0][1]) | |
minutes = float(components[1][0] / components[1][1]) | |
seconds = float(components[2][0] / components[2][1]) | |
coordinate = degrees + (minutes / 60.0) + (seconds / 3600.0) | |
if info[f'{key}Ref'] in [b'S', b'W']: | |
coordinate = -coordinate | |
info[key] = coordinate | |
return info | |
def convert_to_rational(number): | |
number = Fraction(str(number)) | |
return (number.numerator, number.denominator) | |
def decimal_to_dms(decimal_coords): | |
def convert_to_dms(degree): | |
is_positive = degree >= 0 | |
degree = abs(degree) | |
degrees = int(degree) | |
minutes = int((degree - degrees) * 60) | |
seconds = round((degree - degrees - minutes / 60) * 3600, 2) | |
return degrees, minutes, seconds, is_positive | |
def format_dms(degrees, minutes, seconds, is_positive, lat=True): | |
direction = '' | |
if lat: | |
direction = 'N' if is_positive else 'S' | |
else: | |
direction = 'E' if is_positive else 'W' | |
return convert_to_rational(degrees), convert_to_rational(minutes), convert_to_rational(seconds), direction | |
lat, lon = decimal_coords | |
lat_dms = convert_to_dms(lat) | |
lon_dms = convert_to_dms(lon) | |
lat_dms_formatted = format_dms(*lat_dms, lat=True) | |
lon_dms_formatted = format_dms(*lon_dms, lat=False) | |
return lat_dms_formatted, lon_dms_formatted | |
def get_gps_info(exif_data): | |
if exif_data and exif_data['GPS']: | |
gps_info = {} | |
for gps_tag in exif_data['GPS']: | |
sub_tag = GPSTAGS.get(gps_tag, gps_tag) | |
gps_info[sub_tag] = exif_data['GPS'][gps_tag] | |
return get_decimal_coordinates(gps_info) | |
def set_exif_data(image_path, exif_data): | |
try: | |
# Convert to EXIF bytes | |
exif_bytes = piexif.dump(exif_data) | |
# Insert updated EXIF data back to the image | |
piexif.insert(exif_bytes, image_path) | |
print(f"Updated EXIF data for {image_path}") | |
except Exception as e: | |
print(f"Error writing EXIF data to {image_path}: {e}") | |
def update_file_timestamps(file_path, timestamp): | |
os.utime(file_path, (timestamp, timestamp)) | |
print(f"Updated file timestamps for {file_path}") | |
counter.updated_time += 1 | |
def process_media_file(media_path, json_path): | |
if not any(media_path.lower().endswith(ext) for ext in ['.jpg', '.jpeg', '.png', '.heic', '.gif']): | |
print(f"Skipping unsupported file: {media_path}") | |
return | |
try: | |
with open(json_path, 'r') as json_file: | |
metadata = json.load(json_file) | |
except json.JSONDecodeError as e: | |
print(f"Error decoding JSON file {json_path}: {e}") | |
return | |
print('\nProcessing media file:', media_path) | |
photo_taken_time = int(metadata.get('photoTakenTime', {}).get('timestamp', 0)) | |
latitude = metadata.get('geoData', {}).get('latitude') | |
longitude = metadata.get('geoData', {}).get('longitude') | |
altitude = metadata.get('geoData', {}).get('altitude') | |
exif_data = get_exif_data(media_path) | |
# Update date/time | |
updated = False | |
if photo_taken_time: | |
try: | |
exif_date = exif_data['Exif'][piexif.ExifIFD.DateTimeOriginal] | |
exif_timestamp = int(datetime.strptime(exif_date, "%Y:%m:%d %H:%M:%S").timestamp()) | |
except: | |
exif_timestamp = None | |
if not exif_timestamp or (exif_timestamp and photo_taken_time < exif_timestamp): | |
updated_time = datetime.fromtimestamp(photo_taken_time).strftime("%Y:%m:%d %H:%M:%S") | |
if 'Exif' in exif_data: | |
exif_data['Exif'][piexif.ExifIFD.DateTimeOriginal] = updated_time | |
exif_data['Exif'][piexif.ExifIFD.DateTimeDigitized] = updated_time | |
exif_data['0th'][piexif.ImageIFD.DateTime] = updated_time | |
updated = True | |
print(f"Updated date/time for {media_path}: {updated_time}") | |
else: | |
print(f"Skipping {media_path}: EXIF date is older than JSON date", exif_timestamp, photo_taken_time) | |
counter.skipped_time += 1 | |
# Update GPS data | |
if 'GPS' in exif_data and latitude is not None and longitude is not None: | |
gps_info = get_gps_info(exif_data) | |
lat_dms, lon_dms = decimal_to_dms((latitude, longitude)) | |
# Check if GPS info is missing or coordinates are different (within tolerance) | |
if not gps_info or not (are_coordinates_equal(gps_info.get('GPSLatitude', 0), latitude) and | |
are_coordinates_equal(gps_info.get('GPSLongitude', 0), longitude)): | |
exif_data['GPS'][piexif.GPSIFD.GPSLatitude] = lat_dms[:3] | |
exif_data['GPS'][piexif.GPSIFD.GPSLatitudeRef] = lat_dms[3] | |
exif_data['GPS'][piexif.GPSIFD.GPSLongitude] = lon_dms[:3] | |
exif_data['GPS'][piexif.GPSIFD.GPSLongitudeRef] = lon_dms[3] | |
if altitude is not None: | |
exif_data['GPS'][piexif.GPSIFD.GPSAltitude] = convert_to_rational(round(abs(altitude), 4)) | |
exif_data['GPS'][piexif.GPSIFD.GPSAltitudeRef] = 1 if altitude > 0 else 0 | |
updated = True | |
print(f"Updated GPS data for {media_path}: {lat_dms}, {lon_dms}, {altitude}") | |
else: | |
print(f"Skipping {media_path}: GPS data already present and correct in EXIF") | |
counter.skipped_exif += 1 | |
if updated: | |
set_exif_data(media_path, exif_data) | |
if photo_taken_time: | |
update_file_timestamps(media_path, photo_taken_time) | |
def normalize_filename(filename): | |
# Split filename into name and extension | |
name, ext = os.path.splitext(filename) | |
# Normalize case | |
name = name.lower() | |
ext = ext.lower() | |
# Remove '-edited' suffix | |
name = re.sub(r'-edited$', '', name) | |
# Remove '.fullsizerender' suffix | |
name = re.sub(r'\.fullsizerender$', '', name) | |
# Move (n) to the end if present | |
name = re.sub(r'(\(\d+\))(.+)$', r'\2\1', name) | |
# Handle the special case of unsupported characters | |
name = name.replace('%', '_').replace('\'', '_') | |
# Replace underscores with spaces | |
name = name.replace('_', ' ') | |
# URL decode the string | |
name = urllib.parse.unquote(name) | |
# Remove any double spaces that might have been created | |
name = re.sub(r'\s+', ' ', name) | |
# Strip leading and trailing spaces | |
name = name.strip() | |
# Handle jpg/jpeg case | |
if ext in ('.jpg', '.jpeg'): | |
ext = '.jpg' | |
return name, ext | |
def truncate_filename(filename, max_length=JSON_FILENAME_MAX_LENGTH): | |
# Remove extension before truncating | |
name, ext = os.path.splitext(filename) | |
if len(name) <= max_length: | |
return filename | |
return name[:max_length] + ext | |
def process_album(album_path): | |
media_files = defaultdict(lambda: defaultdict(list)) | |
json_files = defaultdict(list) | |
unmatched_media = [] | |
unmatched_json = [] | |
ignored_files = [] | |
matched_count = 0 | |
# First pass: collect all files | |
for filename in os.listdir(album_path): | |
if filename == 'metadata.json': | |
continue | |
full_path = os.path.join(album_path, filename) | |
normalized_name, ext = normalize_filename(filename) | |
if filename.lower().endswith(MEDIA_EXTENSIONS): | |
if '.fullsizerender.' in filename.lower() or '-edited' in filename: | |
ignored_files.append(full_path) | |
print(f" Ignored file: {filename}") | |
else: | |
media_files[normalized_name][ext].append(full_path) | |
print(f" Found media file: {filename}") | |
print(f" Normalized name: {normalized_name}{ext}") | |
elif filename.lower().endswith('.json'): | |
json_name, _ = normalize_filename(filename.rsplit('.', 2)[0]) # Remove both .json and media extension | |
json_files[json_name].append(full_path) | |
print(f" Found JSON file: {filename}") | |
print(f" Normalized name: {json_name}") | |
# Second pass: match files | |
print("\nPairing files:") | |
for media_name, ext_dict in media_files.items(): | |
for ext, media_paths in ext_dict.items(): | |
matched = False | |
for json_name, json_paths in json_files.items(): | |
if media_name.startswith(json_name) or json_name == truncate_filename(media_name): | |
print(f" Matched: {os.path.basename(media_paths[0])} with {os.path.basename(json_paths[0])}") | |
print(f" Media normalized: {media_name}{ext}") | |
print(f" JSON normalized: {json_name}") | |
matched_count += 1 | |
matched = True | |
if len(media_paths) > 1: | |
ignored_files.extend(media_paths[1:]) | |
print(f" Ignored duplicate media: {', '.join(os.path.basename(m) for m in media_paths[1:])}") | |
if len(json_paths) > 1: | |
print(f" Multiple JSON files found for {media_name}") | |
break | |
if not matched: | |
unmatched_media.extend(media_paths) | |
print(f" Unmatched media: {', '.join(os.path.basename(m) for m in media_paths)}") | |
print(f" Normalized name: {media_name}{ext}") | |
# Check for any unmatched JSON files | |
for json_name, json_paths in json_files.items(): | |
if not any(json_name == truncate_filename(media_name) or media_name.startswith(json_name) for media_name in media_files.keys()): | |
unmatched_json.extend(json_paths) | |
print(f" Unmatched JSON: {', '.join(os.path.basename(j) for j in json_paths)}") | |
print(f" Normalized name: {json_name}") | |
return unmatched_media, unmatched_json, ignored_files, matched_count, media_files, json_files | |
def main(): | |
root_dir = './google-photos/albums' | |
total_unmatched_media = [] | |
total_unmatched_json = [] | |
total_ignored_files = [] | |
total_matched_count = 0 | |
for dirpath, dirnames, filenames in os.walk(root_dir): | |
if any(f.lower().endswith(MEDIA_EXTENSIONS) for f in filenames): | |
print(f"\nProcessing album: {os.path.basename(dirpath)}") | |
unmatched_media, unmatched_json, ignored_files, matched_count, _, _ = process_album(dirpath) | |
total_unmatched_media.extend(unmatched_media) | |
total_unmatched_json.extend(unmatched_json) | |
total_ignored_files.extend(ignored_files) | |
total_matched_count += matched_count | |
print("\nSummary:") | |
print(f"Total matched media files: {total_matched_count}") | |
print(f"Total unmatched media files: {len(total_unmatched_media)}") | |
print(f"Total unmatched JSON files: {len(total_unmatched_json)}") | |
print(f"Total ignored files: {len(total_ignored_files)}") | |
print("\nUnmatched media files:") | |
for file in total_unmatched_media: | |
print(f" {file}") | |
print("\nUnmatched JSON files:") | |
for file in total_unmatched_json: | |
print(f" {file}") | |
print("\nIgnored files:") | |
for file in total_ignored_files: | |
print(f" {file}") | |
print("\nDeleting ignored files:") | |
delete_ignored_files(total_ignored_files) | |
print("\nProcessing unmatched files:") | |
for media_path in total_unmatched_media: | |
date = get_creation_date(get_metadata(media_path)) | |
if date: | |
print(f"Updating {media_path} to {date}") | |
update_file_timestamps(media_path, date) | |
print("\nProcessing matched files:") | |
for dirpath, dirnames, filenames in os.walk(root_dir): | |
if any(f.lower().endswith(MEDIA_EXTENSIONS) for f in filenames): | |
_, _, _, _, media_files, json_files = process_album(dirpath) | |
for media_name, ext_dict in media_files.items(): | |
for ext, media_paths in ext_dict.items(): | |
for json_name, json_paths in json_files.items(): | |
if media_name.startswith(json_name) or json_name == truncate_filename(media_name): | |
for media_path in media_paths: | |
for json_path in json_paths: | |
process_media_file(media_path, json_path) | |
print("Media processing complete.") | |
print(f"Files skipped for time updates: {counter.skipped_time}") | |
print(f"Files skipped for EXIF updates: {counter.skipped_exif}") | |
print(f"Files successfully updated for time: {counter.updated_time}") | |
print(f"Files successfully updated for EXIF data: {counter.updated_exif}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment