Created
August 9, 2021 13:58
-
-
Save ahmedkhalf/783c0c0a0f1132118cc64ca90426029d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from shutil import which | |
import os | |
from tqdm import tqdm | |
import time | |
import logging | |
import subprocess | |
import sys | |
from pathlib import Path | |
from collections import Counter | |
import datetime | |
import shutil | |
class ColoredFormatter(logging.Formatter): | |
"""Logging Formatter to add colors and count warning / errors""" | |
grey = "\x1b[38m" | |
green = "\x1b[32m" | |
yellow = "\x1b[33m" | |
red = "\x1b[31m" | |
bold_red = "\x1b[31;1m" | |
reset = "\x1b[0m" | |
_format = "%(asctime)s - %(message)s" | |
FORMATS = { | |
logging.DEBUG: grey + _format + reset, | |
logging.INFO: green + _format + reset, | |
logging.WARNING: yellow + _format + reset, | |
logging.ERROR: red + _format + reset, | |
logging.CRITICAL: bold_red + _format + reset | |
} | |
def format(self, record): | |
log_fmt = self.FORMATS.get(record.levelno) | |
formatter = logging.Formatter(log_fmt) | |
return formatter.format(record) | |
class ColoredLogger(logging.Logger): | |
def __init__(self, name): | |
logging.Logger.__init__(self, name, logging.DEBUG) | |
color_formatter = ColoredFormatter() | |
console = logging.StreamHandler() | |
console.setFormatter(color_formatter) | |
self.addHandler(console) | |
return | |
def checkApp(appName, dependency) -> None: | |
logger = logging.getLogger(__name__) | |
if which(appName) is not None: | |
logger.debug(f"{appName} found!") | |
return | |
logger.error(f"{appName} not found, please install {dependency}") | |
sys.exit(1) | |
def check_requirements(): | |
checkApp("idevicepair", "libimobiledevice") | |
checkApp("ifuse", "ifuse") | |
checkApp("heif-convert", "libheif") | |
def pair_device(): | |
logger = logging.getLogger(__name__) | |
logger.debug("pairing device ...") | |
command = "idevicepair validate" | |
p = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) | |
p.communicate() | |
if p.returncode != 0: | |
logger.error("error pairing device") | |
sys.exit(2) | |
logger.debug("device paired!") | |
def get_mount_dir() -> Path: | |
path = Path.cwd() / "iphone_mount_temp" | |
return path | |
def mount_device(): | |
logger = logging.getLogger(__name__) | |
mount_path = get_mount_dir() | |
logger.debug(f"mounting device to {mount_path} ...") | |
# Create Mount Directory | |
if mount_path.exists(): | |
logger.error(f"mount path already exists") | |
sys.exit(3) | |
mount_path.mkdir() | |
command = f"ifuse {mount_path}" | |
p = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) | |
p.communicate() | |
if p.returncode != 0: | |
logger.error("error mounting device") | |
sys.exit(3) | |
logger.debug("device mounted!") | |
# Give it two seconds to mount | |
time.sleep(2) | |
def count_files(): | |
logger = logging.getLogger(__name__) | |
logger.debug("counting files ...") | |
mount_path = get_mount_dir() | |
files_path = mount_path / "DCIM" | |
if not files_path.exists(): | |
logger.error("cannot find dcim for count") | |
sys.exit("4") | |
extensions = [] | |
for file in files_path.rglob("*"): | |
if file.is_file(): | |
extensions.append(file.suffix) | |
logger.info(Counter(extensions)) | |
total = len(extensions) | |
logger.info(f"You have a total of {total} files!") | |
return total | |
def import_files(total): | |
logger = logging.getLogger(__name__) | |
logger.debug("importing files ...") | |
mount_path = get_mount_dir() | |
files_path = mount_path / "DCIM" | |
now = datetime.datetime.now() | |
out_path = Path.cwd() / now.strftime("%m-%d-%Y") | |
if not files_path.exists(): | |
logger.error("cannot find dcim for import") | |
sys.exit("4") | |
pbar = tqdm(total=total) | |
for file in files_path.rglob("*"): | |
if file.is_file(): | |
tqdm.write(str(file.relative_to(mount_path))) | |
mtime = os.path.getmtime(file) | |
creation = datetime.datetime.fromtimestamp(mtime) | |
# Make output folder | |
outfile = out_path / str(creation.year) / str(creation.month).zfill(2) | |
outfile.mkdir(parents=True, exist_ok=True) | |
# Make error folder | |
# errorfile = Path(outpath) / "MAC ONLY" | |
# outfile.mkdir(parents=True, exist_ok=True) | |
extension = file.suffix | |
if extension.lower() == ".heic": | |
outcmd = outfile / f"{str(file.stem)+'_h'}.png" | |
command = f"heif-convert -q 90 \"{file.absolute()}\" \"{outcmd.absolute()}\"" | |
tqdm.write(command) | |
p = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) | |
p.communicate() | |
if p.returncode != 0: | |
tqdm.write(f"Error Converting {outfile}") | |
else: | |
shutil.copy2(file, outfile) | |
pbar.update(1) | |
pbar.close() | |
def unmount_device(): | |
logger = logging.getLogger(__name__) | |
mount_path = get_mount_dir() | |
logger.debug(f"unmounting device from {mount_path} ...") | |
command = f"fusermount -u {mount_path}" | |
p = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) | |
p.communicate() | |
if p.returncode != 0: | |
logger.error("error unmounting device") | |
return False | |
mount_path.rmdir() | |
logger.debug("device unmounted!") | |
def main(): | |
logging.setLoggerClass(ColoredLogger) | |
check_requirements() | |
pair_device() | |
mount_device() | |
total = count_files() | |
import_files(total) | |
for _ in range(3): | |
if unmount_device() != False: | |
break | |
time.sleep(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment