Skip to content

Instantly share code, notes, and snippets.

@salbahra
Last active July 25, 2024 11:04
Show Gist options
  • Save salbahra/c9e95ce0ca1d2be83e6af63cc64538aa to your computer and use it in GitHub Desktop.
Save salbahra/c9e95ce0ca1d2be83e6af63cc64538aa to your computer and use it in GitHub Desktop.
Processes Google Takeout data for Photos by pairing JSON and media files and updating media files with metadata from the JSON.
import os
import re
import json
from collections import defaultdict
import urllib.parse
from fractions import Fraction
from datetime import datetime
from PIL.ExifTags import GPSTAGS
import piexif
import subprocess
MEDIA_EXTENSIONS = ('.avi', '.mp4', '.mov', '.mkv', '.m4v', '.mpg', '.wmv', '.jpg', '.jpeg', '.png', '.heic', '.gif', '.3gp')
JSON_FILENAME_MAX_LENGTH = 50
GPS_TOLERANCE = 1e-6 # Approximately 0.1 meters at the equator
class Counter:
def __init__(self):
self.skipped_time = 0
self.skipped_exif = 0
self.updated_time = 0
self.updated_exif = 0
counter = Counter()
def get_metadata(file_path):
try:
result = subprocess.run(
['exiftool', '-json', file_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
metadata = json.loads(result.stdout)
return metadata[0] if metadata else {}
except Exception as e:
print(f"An error occurred: {e}")
return {}
def get_creation_date(metadata):
date_formats = ['%Y:%m:%d %H:%M:%S', '%Y/%m/%d %H:%M:%S']
date_keys = [
'CreateDate', 'CreationDate', 'DateTimeOriginal', 'MediaCreateDate',
'TrackCreateDate', 'ModifyDate', 'MediaModifyDate', 'TrackModifyDate'
]
for key in date_keys:
if key in metadata:
for date_format in date_formats:
try:
return int(datetime.strptime(metadata[key], date_format).timestamp())
except ValueError:
continue
return None
def delete_ignored_files(ignored_files):
for file in ignored_files:
try:
os.remove(file)
print(f"Deleted ignored file: {file}")
except OSError as e:
print(f"Error deleting {file}: {e}")
def are_coordinates_equal(coord1, coord2, tolerance=GPS_TOLERANCE):
return abs(coord1 - coord2) <= tolerance
def get_exif_data(image_path):
try:
return piexif.load(image_path)
except Exception as e:
print(f"Error reading EXIF data from {image_path}: {e}")
return {}
def get_decimal_coordinates(info):
for key in ['GPSLatitude', 'GPSLongitude']:
if key in info:
components = info[key]
degrees = float(components[0][0] / components[0][1])
minutes = float(components[1][0] / components[1][1])
seconds = float(components[2][0] / components[2][1])
coordinate = degrees + (minutes / 60.0) + (seconds / 3600.0)
if info[f'{key}Ref'] in [b'S', b'W']:
coordinate = -coordinate
info[key] = coordinate
return info
def convert_to_rational(number):
number = Fraction(str(number))
return (number.numerator, number.denominator)
def decimal_to_dms(decimal_coords):
def convert_to_dms(degree):
is_positive = degree >= 0
degree = abs(degree)
degrees = int(degree)
minutes = int((degree - degrees) * 60)
seconds = round((degree - degrees - minutes / 60) * 3600, 2)
return degrees, minutes, seconds, is_positive
def format_dms(degrees, minutes, seconds, is_positive, lat=True):
direction = ''
if lat:
direction = 'N' if is_positive else 'S'
else:
direction = 'E' if is_positive else 'W'
return convert_to_rational(degrees), convert_to_rational(minutes), convert_to_rational(seconds), direction
lat, lon = decimal_coords
lat_dms = convert_to_dms(lat)
lon_dms = convert_to_dms(lon)
lat_dms_formatted = format_dms(*lat_dms, lat=True)
lon_dms_formatted = format_dms(*lon_dms, lat=False)
return lat_dms_formatted, lon_dms_formatted
def get_gps_info(exif_data):
if exif_data and exif_data['GPS']:
gps_info = {}
for gps_tag in exif_data['GPS']:
sub_tag = GPSTAGS.get(gps_tag, gps_tag)
gps_info[sub_tag] = exif_data['GPS'][gps_tag]
return get_decimal_coordinates(gps_info)
def set_exif_data(image_path, exif_data):
try:
# Convert to EXIF bytes
exif_bytes = piexif.dump(exif_data)
# Insert updated EXIF data back to the image
piexif.insert(exif_bytes, image_path)
print(f"Updated EXIF data for {image_path}")
except Exception as e:
print(f"Error writing EXIF data to {image_path}: {e}")
def update_file_timestamps(file_path, timestamp):
os.utime(file_path, (timestamp, timestamp))
print(f"Updated file timestamps for {file_path}")
counter.updated_time += 1
def process_media_file(media_path, json_path):
if not any(media_path.lower().endswith(ext) for ext in ['.jpg', '.jpeg', '.png', '.heic', '.gif']):
print(f"Skipping unsupported file: {media_path}")
return
try:
with open(json_path, 'r') as json_file:
metadata = json.load(json_file)
except json.JSONDecodeError as e:
print(f"Error decoding JSON file {json_path}: {e}")
return
print('\nProcessing media file:', media_path)
photo_taken_time = int(metadata.get('photoTakenTime', {}).get('timestamp', 0))
latitude = metadata.get('geoData', {}).get('latitude')
longitude = metadata.get('geoData', {}).get('longitude')
altitude = metadata.get('geoData', {}).get('altitude')
exif_data = get_exif_data(media_path)
# Update date/time
updated = False
if photo_taken_time:
try:
exif_date = exif_data['Exif'][piexif.ExifIFD.DateTimeOriginal]
exif_timestamp = int(datetime.strptime(exif_date, "%Y:%m:%d %H:%M:%S").timestamp())
except:
exif_timestamp = None
if not exif_timestamp or (exif_timestamp and photo_taken_time < exif_timestamp):
updated_time = datetime.fromtimestamp(photo_taken_time).strftime("%Y:%m:%d %H:%M:%S")
if 'Exif' in exif_data:
exif_data['Exif'][piexif.ExifIFD.DateTimeOriginal] = updated_time
exif_data['Exif'][piexif.ExifIFD.DateTimeDigitized] = updated_time
exif_data['0th'][piexif.ImageIFD.DateTime] = updated_time
updated = True
print(f"Updated date/time for {media_path}: {updated_time}")
else:
print(f"Skipping {media_path}: EXIF date is older than JSON date", exif_timestamp, photo_taken_time)
counter.skipped_time += 1
# Update GPS data
if 'GPS' in exif_data and latitude is not None and longitude is not None:
gps_info = get_gps_info(exif_data)
lat_dms, lon_dms = decimal_to_dms((latitude, longitude))
# Check if GPS info is missing or coordinates are different (within tolerance)
if not gps_info or not (are_coordinates_equal(gps_info.get('GPSLatitude', 0), latitude) and
are_coordinates_equal(gps_info.get('GPSLongitude', 0), longitude)):
exif_data['GPS'][piexif.GPSIFD.GPSLatitude] = lat_dms[:3]
exif_data['GPS'][piexif.GPSIFD.GPSLatitudeRef] = lat_dms[3]
exif_data['GPS'][piexif.GPSIFD.GPSLongitude] = lon_dms[:3]
exif_data['GPS'][piexif.GPSIFD.GPSLongitudeRef] = lon_dms[3]
if altitude is not None:
exif_data['GPS'][piexif.GPSIFD.GPSAltitude] = convert_to_rational(round(abs(altitude), 4))
exif_data['GPS'][piexif.GPSIFD.GPSAltitudeRef] = 1 if altitude > 0 else 0
updated = True
print(f"Updated GPS data for {media_path}: {lat_dms}, {lon_dms}, {altitude}")
else:
print(f"Skipping {media_path}: GPS data already present and correct in EXIF")
counter.skipped_exif += 1
if updated:
set_exif_data(media_path, exif_data)
if photo_taken_time:
update_file_timestamps(media_path, photo_taken_time)
def normalize_filename(filename):
# Split filename into name and extension
name, ext = os.path.splitext(filename)
# Normalize case
name = name.lower()
ext = ext.lower()
# Remove '-edited' suffix
name = re.sub(r'-edited$', '', name)
# Remove '.fullsizerender' suffix
name = re.sub(r'\.fullsizerender$', '', name)
# Move (n) to the end if present
name = re.sub(r'(\(\d+\))(.+)$', r'\2\1', name)
# Handle the special case of unsupported characters
name = name.replace('%', '_').replace('\'', '_')
# Replace underscores with spaces
name = name.replace('_', ' ')
# URL decode the string
name = urllib.parse.unquote(name)
# Remove any double spaces that might have been created
name = re.sub(r'\s+', ' ', name)
# Strip leading and trailing spaces
name = name.strip()
# Handle jpg/jpeg case
if ext in ('.jpg', '.jpeg'):
ext = '.jpg'
return name, ext
def truncate_filename(filename, max_length=JSON_FILENAME_MAX_LENGTH):
# Remove extension before truncating
name, ext = os.path.splitext(filename)
if len(name) <= max_length:
return filename
return name[:max_length] + ext
def process_album(album_path):
media_files = defaultdict(lambda: defaultdict(list))
json_files = defaultdict(list)
unmatched_media = []
unmatched_json = []
ignored_files = []
matched_count = 0
# First pass: collect all files
for filename in os.listdir(album_path):
if filename == 'metadata.json':
continue
full_path = os.path.join(album_path, filename)
normalized_name, ext = normalize_filename(filename)
if filename.lower().endswith(MEDIA_EXTENSIONS):
if '.fullsizerender.' in filename.lower() or '-edited' in filename:
ignored_files.append(full_path)
print(f" Ignored file: {filename}")
else:
media_files[normalized_name][ext].append(full_path)
print(f" Found media file: {filename}")
print(f" Normalized name: {normalized_name}{ext}")
elif filename.lower().endswith('.json'):
json_name, _ = normalize_filename(filename.rsplit('.', 2)[0]) # Remove both .json and media extension
json_files[json_name].append(full_path)
print(f" Found JSON file: {filename}")
print(f" Normalized name: {json_name}")
# Second pass: match files
print("\nPairing files:")
for media_name, ext_dict in media_files.items():
for ext, media_paths in ext_dict.items():
matched = False
for json_name, json_paths in json_files.items():
if media_name.startswith(json_name) or json_name == truncate_filename(media_name):
print(f" Matched: {os.path.basename(media_paths[0])} with {os.path.basename(json_paths[0])}")
print(f" Media normalized: {media_name}{ext}")
print(f" JSON normalized: {json_name}")
matched_count += 1
matched = True
if len(media_paths) > 1:
ignored_files.extend(media_paths[1:])
print(f" Ignored duplicate media: {', '.join(os.path.basename(m) for m in media_paths[1:])}")
if len(json_paths) > 1:
print(f" Multiple JSON files found for {media_name}")
break
if not matched:
unmatched_media.extend(media_paths)
print(f" Unmatched media: {', '.join(os.path.basename(m) for m in media_paths)}")
print(f" Normalized name: {media_name}{ext}")
# Check for any unmatched JSON files
for json_name, json_paths in json_files.items():
if not any(json_name == truncate_filename(media_name) or media_name.startswith(json_name) for media_name in media_files.keys()):
unmatched_json.extend(json_paths)
print(f" Unmatched JSON: {', '.join(os.path.basename(j) for j in json_paths)}")
print(f" Normalized name: {json_name}")
return unmatched_media, unmatched_json, ignored_files, matched_count, media_files, json_files
def main():
root_dir = './google-photos/albums'
total_unmatched_media = []
total_unmatched_json = []
total_ignored_files = []
total_matched_count = 0
for dirpath, dirnames, filenames in os.walk(root_dir):
if any(f.lower().endswith(MEDIA_EXTENSIONS) for f in filenames):
print(f"\nProcessing album: {os.path.basename(dirpath)}")
unmatched_media, unmatched_json, ignored_files, matched_count, _, _ = process_album(dirpath)
total_unmatched_media.extend(unmatched_media)
total_unmatched_json.extend(unmatched_json)
total_ignored_files.extend(ignored_files)
total_matched_count += matched_count
print("\nSummary:")
print(f"Total matched media files: {total_matched_count}")
print(f"Total unmatched media files: {len(total_unmatched_media)}")
print(f"Total unmatched JSON files: {len(total_unmatched_json)}")
print(f"Total ignored files: {len(total_ignored_files)}")
print("\nUnmatched media files:")
for file in total_unmatched_media:
print(f" {file}")
print("\nUnmatched JSON files:")
for file in total_unmatched_json:
print(f" {file}")
print("\nIgnored files:")
for file in total_ignored_files:
print(f" {file}")
print("\nDeleting ignored files:")
delete_ignored_files(total_ignored_files)
print("\nProcessing unmatched files:")
for media_path in total_unmatched_media:
date = get_creation_date(get_metadata(media_path))
if date:
print(f"Updating {media_path} to {date}")
update_file_timestamps(media_path, date)
print("\nProcessing matched files:")
for dirpath, dirnames, filenames in os.walk(root_dir):
if any(f.lower().endswith(MEDIA_EXTENSIONS) for f in filenames):
_, _, _, _, media_files, json_files = process_album(dirpath)
for media_name, ext_dict in media_files.items():
for ext, media_paths in ext_dict.items():
for json_name, json_paths in json_files.items():
if media_name.startswith(json_name) or json_name == truncate_filename(media_name):
for media_path in media_paths:
for json_path in json_paths:
process_media_file(media_path, json_path)
print("Media processing complete.")
print(f"Files skipped for time updates: {counter.skipped_time}")
print(f"Files skipped for EXIF updates: {counter.skipped_exif}")
print(f"Files successfully updated for time: {counter.updated_time}")
print(f"Files successfully updated for EXIF data: {counter.updated_exif}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment