-
-
Save datavudeja/547be10f3c5714b4d9f1edad4de88224 to your computer and use it in GitHub Desktop.
Recursively convert videos in a specified folder with FFmpeg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import re | |
import shutil | |
import subprocess | |
from datetime import datetime, timedelta | |
from fnmatch import fnmatch | |
from mimetypes import guess_type | |
from pathlib import Path | |
from tempfile import TemporaryDirectory | |
from typing import NamedTuple, Optional, cast | |
try: | |
from rich import progress | |
except ImportError: | |
from pip._vendor.rich import progress | |
FFMPEG_ARGS = '-y -hwaccel auto -i "%s" -vcodec hevc -acodec aac "%s"' | |
argument_parser = argparse.ArgumentParser( | |
description="Converts all files in a directory to a target format", | |
epilog="Example: python convert.py --target-ext .mp4 --accepted-mime video/*", | |
) | |
argument_parser.add_argument( | |
"--target-ext", | |
type=str, | |
default=".mp4", | |
help="The target file extension to convert to", | |
) | |
argument_parser.add_argument( | |
"--accepted-mime", | |
type=str, | |
default="video/*", | |
help="The accepted mime type to convert, wildcards are allowed", | |
) | |
argument_parser.add_argument( | |
"--dry-run", | |
action="store_true", | |
help="Only print the commands that would be executed", | |
) | |
argument_parser.add_argument( | |
"--converted-dir", | |
type=Path, | |
default=None, | |
help="The target directory to convert to, defaults to the 'parent/<name>_converted'", | |
) | |
argument_parser.add_argument( | |
"path", | |
type=Path, | |
help="The folder path to convert", | |
) | |
class FFmpegError(RuntimeError): | |
def __init__(self, message: str, command: str, stderr: str, *args: object) -> None: | |
self.command, self.stderr = command, stderr | |
super().__init__(message, *args) | |
class FFMpegProgress(NamedTuple): | |
total_duration: timedelta | |
processed_duration: timedelta | |
time_elapsed: timedelta | |
class FFmpegConvert: | |
@staticmethod | |
def to_ms(precision: Optional[int] = None, **kwargs): | |
hour = int(kwargs.get("hour", 0)) | |
minute = int(kwargs.get("min", 0)) | |
sec = int(kwargs.get("sec", 0)) | |
ms = int(kwargs.get("ms", 0)) | |
return timedelta(hours=hour, minutes=minute, seconds=sec, milliseconds=ms) | |
DUR_REGEX = re.compile( | |
r"Duration: (?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})" | |
) | |
TIME_REGEX = re.compile( | |
r"out_time=(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})" | |
) | |
def __init__(self, cmd: str, dry_run: bool = False) -> None: | |
self.cmd = cmd | |
self.dry_run = dry_run | |
self.stdout = [] | |
@property | |
def stdout_text(self) -> str: | |
return "\n".join(self.stdout) | |
def run_command_with_progress(self): | |
if self.dry_run: | |
return | |
total_duration = None | |
cmd_with_progress = f"ffmpeg -progress - -nostats {self.cmd}" | |
p = subprocess.Popen( | |
cmd_with_progress, | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
universal_newlines=False, | |
shell=True, | |
) | |
start_time = datetime.now() | |
assert p.stdout | |
for line in iter(p.stdout.readline, b""): | |
stdout_line = line.decode("utf-8", errors="replace").strip() | |
self.stdout.append(stdout_line) | |
total_dur_match = FFmpegConvert.DUR_REGEX.search(stdout_line) | |
if total_dur_match and total_duration is None: | |
total_duration = self.to_ms(**total_dur_match.groupdict()) | |
if total_duration and ( | |
progress_time := FFmpegConvert.TIME_REGEX.search(stdout_line) | |
): | |
elapsed_time = self.to_ms(**progress_time.groupdict()) | |
yield FFMpegProgress( | |
total_duration, elapsed_time, datetime.now() - start_time | |
) | |
while p.poll() is None: | |
pass | |
if p.returncode != 0: | |
raise FFmpegError( | |
f"Error running command {self.cmd!r}", | |
command=self.cmd, | |
stderr=self.stdout_text, | |
) | |
return | |
def _do_convert( | |
source: Path, | |
target: Path, | |
relative_source_path: Path, | |
target_root: Path, | |
progress_bar: progress.Progress, | |
): | |
exc = None | |
task_description = ( | |
f"[cyan]{relative_source_path} -> [green]{target.name}[/] " | |
"speed=[red]%.3fx[/]" | |
) | |
job = progress_bar.add_task(task_description) | |
with TemporaryDirectory() as temp_dir: | |
execute = FFmpegConvert( | |
FFMPEG_ARGS % (source, temp_target := Path(temp_dir) / target.name) | |
) | |
try: | |
for status in execute.run_command_with_progress(): | |
progress_bar.update(job, total=status.total_duration.total_seconds()) | |
progress_bar.update( | |
job, | |
completed=status.processed_duration.total_seconds(), | |
total=status.total_duration.total_seconds(), | |
description=task_description | |
% ( | |
status.processed_duration.total_seconds() | |
/ status.time_elapsed.total_seconds() | |
), | |
) | |
progress_bar.refresh() | |
except Exception as exc: | |
if isinstance(exc, FFmpegError): | |
error_log_name = datetime.now().strftime("%Y%m%d%H%M%S") + ".log" | |
with (target_root / error_log_name).open("wt", encoding="utf-8") as f: | |
f.write(execute.stdout_text) | |
finally: | |
progress_bar.remove_task(job) | |
shutil.copyfile(temp_target, target) | |
return exc # type: ignore | |
def main(progress_bar: progress.Progress): | |
args = argument_parser.parse_args() | |
execute_dir = cast(Path, args.path).resolve().absolute() | |
target_ext = cast(str, args.target_ext) | |
accepted_mime = cast(str, args.accepted_mime) | |
target_dir = cast(Optional[Path], args.converted_dir) or ( | |
execute_dir.parent / f"{execute_dir.name}_converted" | |
) | |
for source_path in progress_bar.track( | |
sorted(path for path in execute_dir.glob("**/*") if path.is_file()), | |
): | |
relative_source_path = source_path.relative_to(execute_dir) | |
target_path = target_dir / relative_source_path | |
target_path.parent.mkdir(parents=True, exist_ok=True) | |
mime_type, _ = guess_type(source_path) | |
if should_convert := fnmatch(mime_type or "", accepted_mime): | |
target_path = target_path.with_suffix(target_ext) | |
if target_path.is_file(): | |
progress_bar.log( | |
f"Skipping [cyan]{relative_source_path}[/cyan] (already exists)" | |
) | |
continue | |
if should_convert: | |
if not ( | |
error := _do_convert( | |
source_path, | |
target_path, | |
relative_source_path, | |
target_dir, | |
progress_bar, | |
) | |
): | |
begin, end = source_path.stat().st_size, target_path.stat().st_size | |
progress_bar.log( | |
f"Converted [cyan]{relative_source_path}[/cyan] " | |
f"[blue]({begin:,} -> {end:,}), reduced: {1-(end / begin):.2%}" | |
) | |
continue | |
progress_bar.print(f"Failed to convert {relative_source_path}: {error!r}") | |
shutil.copyfile(source_path, target_path) | |
progress_bar.print(f"Copied {relative_source_path}") | |
if __name__ == "__main__": | |
with progress.Progress( | |
*progress.Progress.get_default_columns(), | |
progress.SpinnerColumn(finished_text="✅"), | |
) as progress_bar: | |
main(progress_bar) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment