Skip to content

Instantly share code, notes, and snippets.

@datavudeja
Forked from mnixry/recursive_convert.py
Created February 9, 2025 22:26
Show Gist options
  • Save datavudeja/547be10f3c5714b4d9f1edad4de88224 to your computer and use it in GitHub Desktop.
Save datavudeja/547be10f3c5714b4d9f1edad4de88224 to your computer and use it in GitHub Desktop.
Recursively convert videos in a specified folder with FFmpeg
import argparse
import re
import shutil
import subprocess
from datetime import datetime, timedelta
from fnmatch import fnmatch
from mimetypes import guess_type
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import NamedTuple, Optional, cast
try:
from rich import progress
except ImportError:
from pip._vendor.rich import progress
FFMPEG_ARGS = '-y -hwaccel auto -i "%s" -vcodec hevc -acodec aac "%s"'
argument_parser = argparse.ArgumentParser(
description="Converts all files in a directory to a target format",
epilog="Example: python convert.py --target-ext .mp4 --accepted-mime video/*",
)
argument_parser.add_argument(
"--target-ext",
type=str,
default=".mp4",
help="The target file extension to convert to",
)
argument_parser.add_argument(
"--accepted-mime",
type=str,
default="video/*",
help="The accepted mime type to convert, wildcards are allowed",
)
argument_parser.add_argument(
"--dry-run",
action="store_true",
help="Only print the commands that would be executed",
)
argument_parser.add_argument(
"--converted-dir",
type=Path,
default=None,
help="The target directory to convert to, defaults to the 'parent/<name>_converted'",
)
argument_parser.add_argument(
"path",
type=Path,
help="The folder path to convert",
)
class FFmpegError(RuntimeError):
def __init__(self, message: str, command: str, stderr: str, *args: object) -> None:
self.command, self.stderr = command, stderr
super().__init__(message, *args)
class FFMpegProgress(NamedTuple):
total_duration: timedelta
processed_duration: timedelta
time_elapsed: timedelta
class FFmpegConvert:
@staticmethod
def to_ms(precision: Optional[int] = None, **kwargs):
hour = int(kwargs.get("hour", 0))
minute = int(kwargs.get("min", 0))
sec = int(kwargs.get("sec", 0))
ms = int(kwargs.get("ms", 0))
return timedelta(hours=hour, minutes=minute, seconds=sec, milliseconds=ms)
DUR_REGEX = re.compile(
r"Duration: (?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})"
)
TIME_REGEX = re.compile(
r"out_time=(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})"
)
def __init__(self, cmd: str, dry_run: bool = False) -> None:
self.cmd = cmd
self.dry_run = dry_run
self.stdout = []
@property
def stdout_text(self) -> str:
return "\n".join(self.stdout)
def run_command_with_progress(self):
if self.dry_run:
return
total_duration = None
cmd_with_progress = f"ffmpeg -progress - -nostats {self.cmd}"
p = subprocess.Popen(
cmd_with_progress,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=False,
shell=True,
)
start_time = datetime.now()
assert p.stdout
for line in iter(p.stdout.readline, b""):
stdout_line = line.decode("utf-8", errors="replace").strip()
self.stdout.append(stdout_line)
total_dur_match = FFmpegConvert.DUR_REGEX.search(stdout_line)
if total_dur_match and total_duration is None:
total_duration = self.to_ms(**total_dur_match.groupdict())
if total_duration and (
progress_time := FFmpegConvert.TIME_REGEX.search(stdout_line)
):
elapsed_time = self.to_ms(**progress_time.groupdict())
yield FFMpegProgress(
total_duration, elapsed_time, datetime.now() - start_time
)
while p.poll() is None:
pass
if p.returncode != 0:
raise FFmpegError(
f"Error running command {self.cmd!r}",
command=self.cmd,
stderr=self.stdout_text,
)
return
def _do_convert(
source: Path,
target: Path,
relative_source_path: Path,
target_root: Path,
progress_bar: progress.Progress,
):
exc = None
task_description = (
f"[cyan]{relative_source_path} -> [green]{target.name}[/] "
"speed=[red]%.3fx[/]"
)
job = progress_bar.add_task(task_description)
with TemporaryDirectory() as temp_dir:
execute = FFmpegConvert(
FFMPEG_ARGS % (source, temp_target := Path(temp_dir) / target.name)
)
try:
for status in execute.run_command_with_progress():
progress_bar.update(job, total=status.total_duration.total_seconds())
progress_bar.update(
job,
completed=status.processed_duration.total_seconds(),
total=status.total_duration.total_seconds(),
description=task_description
% (
status.processed_duration.total_seconds()
/ status.time_elapsed.total_seconds()
),
)
progress_bar.refresh()
except Exception as exc:
if isinstance(exc, FFmpegError):
error_log_name = datetime.now().strftime("%Y%m%d%H%M%S") + ".log"
with (target_root / error_log_name).open("wt", encoding="utf-8") as f:
f.write(execute.stdout_text)
finally:
progress_bar.remove_task(job)
shutil.copyfile(temp_target, target)
return exc # type: ignore
def main(progress_bar: progress.Progress):
args = argument_parser.parse_args()
execute_dir = cast(Path, args.path).resolve().absolute()
target_ext = cast(str, args.target_ext)
accepted_mime = cast(str, args.accepted_mime)
target_dir = cast(Optional[Path], args.converted_dir) or (
execute_dir.parent / f"{execute_dir.name}_converted"
)
for source_path in progress_bar.track(
sorted(path for path in execute_dir.glob("**/*") if path.is_file()),
):
relative_source_path = source_path.relative_to(execute_dir)
target_path = target_dir / relative_source_path
target_path.parent.mkdir(parents=True, exist_ok=True)
mime_type, _ = guess_type(source_path)
if should_convert := fnmatch(mime_type or "", accepted_mime):
target_path = target_path.with_suffix(target_ext)
if target_path.is_file():
progress_bar.log(
f"Skipping [cyan]{relative_source_path}[/cyan] (already exists)"
)
continue
if should_convert:
if not (
error := _do_convert(
source_path,
target_path,
relative_source_path,
target_dir,
progress_bar,
)
):
begin, end = source_path.stat().st_size, target_path.stat().st_size
progress_bar.log(
f"Converted [cyan]{relative_source_path}[/cyan] "
f"[blue]({begin:,} -> {end:,}), reduced: {1-(end / begin):.2%}"
)
continue
progress_bar.print(f"Failed to convert {relative_source_path}: {error!r}")
shutil.copyfile(source_path, target_path)
progress_bar.print(f"Copied {relative_source_path}")
if __name__ == "__main__":
with progress.Progress(
*progress.Progress.get_default_columns(),
progress.SpinnerColumn(finished_text="✅"),
) as progress_bar:
main(progress_bar)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment