Skip to content

Instantly share code, notes, and snippets.

@ahmedkhalf
Created August 9, 2021 13:58
Show Gist options
  • Save ahmedkhalf/783c0c0a0f1132118cc64ca90426029d to your computer and use it in GitHub Desktop.
Save ahmedkhalf/783c0c0a0f1132118cc64ca90426029d to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
from shutil import which
import os
from tqdm import tqdm
import time
import logging
import subprocess
import sys
from pathlib import Path
from collections import Counter
import datetime
import shutil
class ColoredFormatter(logging.Formatter):
"""Logging Formatter to add colors and count warning / errors"""
grey = "\x1b[38m"
green = "\x1b[32m"
yellow = "\x1b[33m"
red = "\x1b[31m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
_format = "%(asctime)s - %(message)s"
FORMATS = {
logging.DEBUG: grey + _format + reset,
logging.INFO: green + _format + reset,
logging.WARNING: yellow + _format + reset,
logging.ERROR: red + _format + reset,
logging.CRITICAL: bold_red + _format + reset
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
class ColoredLogger(logging.Logger):
def __init__(self, name):
logging.Logger.__init__(self, name, logging.DEBUG)
color_formatter = ColoredFormatter()
console = logging.StreamHandler()
console.setFormatter(color_formatter)
self.addHandler(console)
return
def checkApp(appName, dependency) -> None:
logger = logging.getLogger(__name__)
if which(appName) is not None:
logger.debug(f"{appName} found!")
return
logger.error(f"{appName} not found, please install {dependency}")
sys.exit(1)
def check_requirements():
checkApp("idevicepair", "libimobiledevice")
checkApp("ifuse", "ifuse")
checkApp("heif-convert", "libheif")
def pair_device():
logger = logging.getLogger(__name__)
logger.debug("pairing device ...")
command = "idevicepair validate"
p = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
p.communicate()
if p.returncode != 0:
logger.error("error pairing device")
sys.exit(2)
logger.debug("device paired!")
def get_mount_dir() -> Path:
path = Path.cwd() / "iphone_mount_temp"
return path
def mount_device():
logger = logging.getLogger(__name__)
mount_path = get_mount_dir()
logger.debug(f"mounting device to {mount_path} ...")
# Create Mount Directory
if mount_path.exists():
logger.error(f"mount path already exists")
sys.exit(3)
mount_path.mkdir()
command = f"ifuse {mount_path}"
p = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
p.communicate()
if p.returncode != 0:
logger.error("error mounting device")
sys.exit(3)
logger.debug("device mounted!")
# Give it two seconds to mount
time.sleep(2)
def count_files():
logger = logging.getLogger(__name__)
logger.debug("counting files ...")
mount_path = get_mount_dir()
files_path = mount_path / "DCIM"
if not files_path.exists():
logger.error("cannot find dcim for count")
sys.exit("4")
extensions = []
for file in files_path.rglob("*"):
if file.is_file():
extensions.append(file.suffix)
logger.info(Counter(extensions))
total = len(extensions)
logger.info(f"You have a total of {total} files!")
return total
def import_files(total):
logger = logging.getLogger(__name__)
logger.debug("importing files ...")
mount_path = get_mount_dir()
files_path = mount_path / "DCIM"
now = datetime.datetime.now()
out_path = Path.cwd() / now.strftime("%m-%d-%Y")
if not files_path.exists():
logger.error("cannot find dcim for import")
sys.exit("4")
pbar = tqdm(total=total)
for file in files_path.rglob("*"):
if file.is_file():
tqdm.write(str(file.relative_to(mount_path)))
mtime = os.path.getmtime(file)
creation = datetime.datetime.fromtimestamp(mtime)
# Make output folder
outfile = out_path / str(creation.year) / str(creation.month).zfill(2)
outfile.mkdir(parents=True, exist_ok=True)
# Make error folder
# errorfile = Path(outpath) / "MAC ONLY"
# outfile.mkdir(parents=True, exist_ok=True)
extension = file.suffix
if extension.lower() == ".heic":
outcmd = outfile / f"{str(file.stem)+'_h'}.png"
command = f"heif-convert -q 90 \"{file.absolute()}\" \"{outcmd.absolute()}\""
tqdm.write(command)
p = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
p.communicate()
if p.returncode != 0:
tqdm.write(f"Error Converting {outfile}")
else:
shutil.copy2(file, outfile)
pbar.update(1)
pbar.close()
def unmount_device():
logger = logging.getLogger(__name__)
mount_path = get_mount_dir()
logger.debug(f"unmounting device from {mount_path} ...")
command = f"fusermount -u {mount_path}"
p = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
p.communicate()
if p.returncode != 0:
logger.error("error unmounting device")
return False
mount_path.rmdir()
logger.debug("device unmounted!")
def main():
logging.setLoggerClass(ColoredLogger)
check_requirements()
pair_device()
mount_device()
total = count_files()
import_files(total)
for _ in range(3):
if unmount_device() != False:
break
time.sleep(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment