Skip to content

Instantly share code, notes, and snippets.

@Cycor
Created January 10, 2022 14:21
Show Gist options
  • Save Cycor/e6e2a1c4e2d907bf7b1a552b7a417472 to your computer and use it in GitHub Desktop.
Save Cycor/e6e2a1c4e2d907bf7b1a552b7a417472 to your computer and use it in GitHub Desktop.
from datetime import datetime
import sys
import math
import ffmpeg
import subprocess
from pathlib import Path
import pprint
VIDEO_SUFFIXES = [
".mp4",
".m4v",
".mov",
".wmv",
".avi",
".flv",
".mkv",
".webm",
".gp3",
".ts",
".mpeg",
".mpg",
".vob",
]
MP4_SUPPORTED_CODECS = [
# Video
"av1",
"h264",
"hevc",
# "mpeg4",
# This is not supported by old hardware
"vp9",
# Audio
"aac",
"ac3",
# "mp3",
"eac3",
"opus"
]
def generate_for_videos(file_dir: Path):
for video in file_dir.rglob("*"):
if video.name == ".gitignore":
continue
if video.is_file() and video.suffix.lower() in VIDEO_SUFFIXES:
media_info = read_video_info(video)
pprint.pprint(media_info)
valid_codec = True
for stream in media_info["video"]:
if stream["codec"] not in MP4_SUPPORTED_CODECS:
valid_codec = False
for stream in media_info["audio"]:
if stream["codec"] not in MP4_SUPPORTED_CODECS:
valid_codec = False
if not valid_codec or video.suffix.lower() != '.mp4':
convert(video, media_info)
def convert(path: Path, fileinfo: dict):
"""Converts a video to mp4 and returns the new file path"""
""" stream_type is one of following: ’v’ or ’V’ for video, ’a’ for audio, ’s’ for subtitle,
’d’ for data, and ’t’ for attachments. ’v’ matches all video streams, ’V’ only matches video streams
which are not attached pictures, video thumbnails or cover arts.
"""
print(f"recoding {path}")
# Pick a new name
if path.suffix.lower() != '.mp4':
out_path = path.with_name(path.stem + ".mp4")
else:
out_path = path.with_name(path.stem + "_x264.mp4")
if out_path.is_file():
print(f"Target file exists {out_path}, skipping")
return
params = [
"ffmpeg",
# "-loglevel", "panic",
"-hwaccel", "cuda",
"-fflags", "+genpts",
]
if path.suffix.lower() == '.vob':
params.extend(["-probesize", "50M"]) # 50Mb
params.extend(["-analyzeduration", "60M"]) # 60s
params.extend([
"-i", str(path),
"-movflags", "+faststart",
"-c:s", "copy", # Copy subtitles
"-c:t", "copy", # Copy attachments
"-c:d", "copy", # Copy data
])
# for idx, video_stream in enumerate(fileinfo["video"]):
# params.extend([f"-map 0:v:{idx}"])
# for idx, audio_stream in enumerate(fileinfo["audio"]):
# params.extend([f"-map 0:a:{idx}"])
params.extend(["-map", "0:v?"])
params.extend(["-map", "0:a?"])
params.extend(["-map", "0:s?"])
params.extend(["-map", "0:t?"])
# params.extend(["-map", "0:d?"]) # Data is not supported is mp4
transcoded = False
for idx, video_stream in enumerate(fileinfo["video"]):
if video_stream.get("codec") in MP4_SUPPORTED_CODECS:
params.extend([f"-c:v:{idx}", "copy"])
else:
transcoded = True
params.extend([
f"-c:v:{idx}", "h264_nvenc",
f"-preset:v:{idx}", "p7",
f"-tune:v:{idx}", "hq",
f"-rc:v:{idx}", "vbr",
f"-cq:v:{idx}", "20",
f"-b:v:{idx}", "0",
f"-profile:v:{idx}", "high",
f"-rc-lookahead:v:{idx}", "40"
])
for idx, audio_stream in enumerate(fileinfo["audio"]):
if audio_stream.get("codec") in MP4_SUPPORTED_CODECS:
params.extend([f"-c:a:{idx}", "copy"])
else:
transcoded = True
params.extend([f"-c:a:{idx}", "libfdk_aac", "-vbr", "4"])
if audio_stream.get("lang") is None:
params.extend([f"-metadata:s:a:{idx}", "language=eng"])
params.append(str(out_path))
print(' '.join(params))
process = subprocess.Popen(params, stdout=subprocess.PIPE, )
for c in iter(lambda: process.stdout.read(1), b''):
sys.stdout.buffer.write(c)
exit_code = process.wait()
# Only works for remux
out_size = out_path.stat().st_size
if not transcoded and exit_code == 0 and out_path.is_file() and out_size > path.stat().st_size * 0.98:
print("remux succeeded, Removing source file")
path.unlink()
if exit_code != 0 or out_size < 100:
print(f"Transcode failed, removing target file: {exit_code} - size {out_size}")
out_path.unlink()
return out_path
def get_duration(stream: dict):
duration = stream.get("duration") # mp4
if not duration and stream.get("tags"):
duration = stream.get("tags", {}).get("DURATION-eng") # mkv
if not duration:
duration = stream.get("tags", {}).get("DURATION") # webm
return duration
def read_video_info(path: Path) -> dict:
try:
probe = ffmpeg.probe(str(path))
except:
print(f"Couldn't probe video {path}. File probably broken")
return {}
media_data = dict()
media_data["audio"] = []
media_data["video"] = []
media_data["file_path"] = str(path)
for idx, stream in enumerate(probe["streams"]):
print(f"stream {idx}: {stream['codec_type']}, ")
if stream["codec_type"] == "audio":
audio_data = dict()
audio_data["codec"] = stream["codec_name"]
audio_data["bitrate"] = stream.get("bit_rate")
audio_data["samplerate"] = stream.get("sample_rate")
audio_data["lang"] = None
tags = stream.get("tags")
if tags is not None:
audio_data["lang"] = tags.get("language")
media_data["audio"].append(audio_data)
if stream["codec_type"] == "video":
video_data = dict()
video_data["height"] = stream["height"]
video_data["width"] = stream["width"]
video_data["codec"] = stream["codec_name"]
video_data["bitrate"] = stream.get("bit_rate")
video_data["framerate"] = stream.get("avg_frame_rate")
video_data["duration"] = get_duration(stream)
video_data["frames"] = stream.get("nb_frames", stream.get("tags", {}).get("NUMBER_OF_FRAMES-eng"))
video_data["video_lang"] = None
tags = stream.get("tags")
if tags is not None:
video_data["video_lang"] = tags.get("language")
media_data["video"].append(video_data)
# pprint.pprint(video_data)
# print("\n\n")
return media_data
# Working dir
workingDir = Path().resolve()
generate_for_videos(workingDir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment