Skip to content

Instantly share code, notes, and snippets.

@Yaulendil
Last active September 8, 2019 08:55
Show Gist options
  • Save Yaulendil/1327391a5e0d083936c0a1532c153f5a to your computer and use it in GitHub Desktop.
Save Yaulendil/1327391a5e0d083936c0a1532c153f5a to your computer and use it in GitHub Desktop.
FFedit: A prompt-based Python Wrapper around FFmpeg.
#!/usr/bin/env python3
"""A semi-interactive tool to manipulate the contents of a video file using
FFprobe and FFmpeg.
Provide a file and it will be analyzed with FFprobe. A prompt will then be
provided to allow interactive renaming of streams in the file. An empty
Input to the prompt will run the FFmpeg Command and move on to the next
File.
Created because I am too dumb to memorize all the options FFmpeg has.
"""
# print("Loading Imports")
from functools import partial
from getopt import getopt
from itertools import accumulate, chain
from json import loads
from pathlib import Path
from pprint import pprint
from re import compile
from readline import (
get_line_buffer,
parse_and_bind,
set_completer,
set_completer_delims,
)
from subprocess import PIPE, run
from sys import argv
from typing import Iterable, List, Union
from blessings import Terminal
# from fuzzyfinder.main import fuzzyfinder
# print("Building Constants")
opts, args = getopt(argv[1:], "", ["script="])
T = Terminal()
ffp = (
"ffprobe",
"-hide_banner",
"-print_format",
"json",
"-show_format",
"-show_streams",
"-show_error",
"-show_programs",
"-show_chapters",
"-pretty",
)
all_decimals = compile(r"(?<=\d)\.\d+")
extra_digits = compile(r"(?<=\d\.\d\d)\d+")
single_quote = compile(r"(?<!\\)'")
target_prefix = ("pal-", "ntsc-", "film-", "")
target_type = ("vcd", "svcd", "dvd", "dv", "dv50")
all_targets = [f"{pre}{targ}" for pre in target_prefix for targ in target_type]
all_formats = list(
filter(
compile(r"^\s\S\S\s(?=[^=])").match,
run(("ffprobe", "-hide_banner", "-formats"), stdout=PIPE, stderr=PIPE)
.stdout.decode()
.split("\n"),
)
)
format_match = lambda s: {
k: v
for k, v in sorted(
(
tuple(map(str.strip, pair[4:].split(maxsplit=1)))
for pair in all_formats
if compile(r"^\s" + s).match(pair)
),
key=(lambda o: o[0]),
)
}
can_mux = format_match(".E")
can_demux = format_match("D.")
all_codecs = list(
filter(
compile(r"^\s\S\S\S\S\S\S\s(?=[^=])").match,
run(("ffprobe", "-hide_banner", "-codecs"), stdout=PIPE, stderr=PIPE)
.stdout.decode()
.split("\n"),
)
)
codec_match = lambda s: {
k: v
for k, v in sorted(
(
tuple(map(str.strip, pair[8:].split(maxsplit=1)))
for pair in all_codecs
if compile(r"^\s" + s).match(pair)
),
key=(lambda o: o[0]),
)
}
can_decode = codec_match(r"D.....")
can_encode = codec_match(r".E....")
codec_video = codec_match(r"..V...")
codec_audio = codec_match(r"..A...")
codec_subs = codec_match(r"..S...")
codecs = {"video": codec_video, "audio": codec_audio, "subs": codec_subs}
dispositions = [
"unset", # INTERNAL. Used to specify that the Disposition be "set" to 0.
"default",
"dub",
"original",
"comment",
"lyrics",
"karaoke",
"forced",
"hearing_impaired",
"visual_impaired",
"clean_effects",
"attached_pic",
"captions",
"descriptions",
"dependent",
"metadata",
]
TAGS = []
all_commands = {
"help": True,
"check": True,
"include": {},
"exclude": {},
"encode": codecs,
"format": can_mux,
"target": dict.fromkeys(all_targets, True),
"fps": True,
"size": True,
"title": True,
"tags": {"set": TAGS, "drop": TAGS, "reset": TAGS, "show": True},
"disposition": {},
}
# print("Preparing Program")
def deval(s: str):
return round(list(accumulate(map(int, s.split("/")), (lambda a, b: a / b)))[-1], 3)
def _get(struct: dict, route: Union[Iterable[str], str], default=None):
here = struct
if isinstance(route, str):
route = route.split("/")
try:
for step in filter(None, route):
here = here[step]
except:
return default
else:
return here
def complete(text: str, i: int):
try:
buf = get_line_buffer()
if buf.strip():
words = buf.split(" ")
words.pop()
penultimate = _get(all_commands, words, all_commands)
if isinstance(penultimate, (dict, list)):
if text:
# return list(fuzzyfinder(text, map(str, penultimate.keys())))[i]
return str(
sorted(
list(
s
for s in penultimate
if s.lower().startswith(text.lower())
)
)[i] + " "
)
else:
return str(list(penultimate)[i]) + " "
else:
return None
else:
return list(all_commands)[i] + " "
except (IndexError, KeyError):
return None
parse_and_bind("tab: complete")
set_completer(complete)
set_completer_delims(" \t\n;")
def cset(data):
streams = list(map(str, range(len(_get(data, "streams")))))
all_commands["include"] = all_commands["exclude"] = dict.fromkeys(streams, True)
all_commands["disposition"] = dict.fromkeys(streams, dispositions)
class FFmpeg(object):
def __init__(self, data, path: Path):
self.data = data
self.path: Path = path
self.target: Path = self.path
self.initial = ["ffmpeg", "-loglevel", "error", "-stats"]
self.general = []
self.include = {}
self.codecs = {}
self.disp = {} # Disposition settings.
self.fmt_target = None # "Target" format, like "DVD", "DV50", etc.
self.fps = None
self.size = None
self.tags_in = self.data.get("format", {}).get("tags", {})
self.tags_out = {}
TAGS[:] = list(self.tags_in)
self.types = {
i: d["codec_type"] for i, d in enumerate(self.data.get("streams", []))
}
def relative(self, stream: int) -> str:
"""Return the Stream Identifier in relative terms.
For example, if the fifth Stream is the second Audio Stream, the result
of `.relative(5)` would be `"a:1"`. This format is required for
setting Disposition.
"""
stype = self.types[stream]
same_type = [k for k, v in self.types.items() if v == stype]
return "{}:{}".format(stype[0], same_type.index(stream))
def tags_final(self, show_all: bool = False):
tfin = self.tags_in.copy()
tfin.update(self.tags_out)
return {
k_out: v_out
for k_out, v_out in tfin.items()
if show_all or v_out != self.tags_in.get(k_out)
}
@property
def tmp(self) -> Path:
return self.target.with_name(f".tmp.{self.target.name}")
def build(self) -> List[str]:
"""Create the FFmpeg Command that will do what this Instance demands."""
tags = tuple(
("-metadata", (f"{k}={v}" if v is not None else f"{k}="))
for k, v in self.tags_final().items()
)
return list(
map(
str,
chain(
self.initial, # Strings defining FFmpeg.
self.general, # Options defining overall behavior.
# Options applying to the Input File:
# (none)
("-i", self.path), # Input File.
# Options applying to the Output File:
*( # Codec:
((f"-c:{t[0]}", c) for t, c in self.codecs.items())
if self.codecs
else ((("-c", "copy"),) if not self.fmt_target else ())
),
("-r", self.fps) if self.fps else (), # Framerate.
("-s", self.size) if self.size else (), # Resolution.
*( # Disposition:
# ("-disposition:{}".format(self.relative(num)), setting)
(f"-disposition:{num}", setting)
for num, setting in self.disp.items()
),
chain(*tags) if tags else (),
*( # Stream Selection:
(("-map", "0"),)
if all(self.include.values())
else (("-map", f"0:{i}") for i, s in self.include.items() if s)
),
("-target", self.fmt_target) if self.fmt_target else (),
("-strict", "-2"), # Allow for Experimental Codecs.
(self.tmp,),
),
)
)
def manipulate(self, cmd: List[str]):
if cmd[0] == "help" or cmd[0] == "?":
print(" ", "\n ".join(all_commands.keys()))
elif cmd[0] == "check":
print(
f"Command Preview:"
f"\n {' '.join(self.build())}"
f"\n && mv -f {self.tmp} {self.target}"
f"\n || rm {self.tmp}"
)
elif cmd[0] == "include":
if cmd[1] in self.include:
self.include[cmd[1]] = True
print(f"Stream 0:{cmd[1]} will be included.")
elif cmd[0] == "exclude":
if cmd[1] in self.include:
self.include[cmd[1]] = False
print(f"Stream 0:{cmd[1]} will NOT be included.")
elif cmd[0] == "disposition":
stream = int(cmd[1])
disp = cmd[2]
if disp == "unset":
self.disp[stream] = "0"
print("Unsetting Disposition.")
elif disp in dispositions:
self.disp[stream] = disp
print(f"Setting Disposition to {disp.replace('_', ' ').title()}.")
else:
print(f"Unknown Disposition.")
elif cmd[0] == "encode":
if cmd[1] in codecs: # Valid type (video, audio, or subs).
if cmd[2] == "none": # Unset encoder.
if cmd[1] in self.codecs:
del self.codecs[cmd[1]]
print(f"Codec unset for {cmd[1]}.")
elif cmd[2] == "copy":
self.codecs[cmd[1]] = "copy"
print(f"Codec for {cmd[1]} will be copied from input.")
elif cmd[2] in can_encode: # Encoder present.
# else:
if cmd[2] in codecs[cmd[1]]: # Codec matches type.
self.codecs[cmd[1]] = cmd[2]
print(
f"Will encode {cmd[1]} with codec: {codecs[cmd[1]][cmd[2]]}"
)
else:
print(f"Codec {cmd[2]!r} is not a codec for {cmd[1]}.")
else:
print(f"Encoder for {cmd[2]!r} not found.")
#
else:
print(f"Unknown stream type {cmd[1]!r}.")
elif cmd[0] == "format":
try:
self.target = self.target.with_suffix(f".{cmd[1]}")
print(f"File Extension set: .{cmd[1]}")
except:
print("Invalid Extension.")
elif cmd[0] == "target":
if cmd[1] in all_targets:
self.fmt_target = cmd[1]
else:
print("Invalid Target.")
elif cmd[0] == "fps":
if cmd[1].isdigit() or cmd[1] == "copy":
self.fps = cmd[1]
print(
"FPS will be copied from input."
if cmd[1] == "copy"
else f"Framerate will be {cmd[1]}fps."
)
else:
print("Invalid Framerate.")
elif cmd[0] == "size":
self.size = cmd[1]
print(f"Output will be rescaled to {cmd[1]}.")
elif cmd[0] == "title":
title = " ".join(cmd[1:]).format(_get(self.data, "format/tags/title"))
self.tags_out["title"] = title
print(f"Title will be set to: {title!r}")
elif cmd[0] == "tags":
try:
act = cmd[1]
except IndexError:
print(
*(f"- {k!r}: {v!r}" for k, v in self.tags_final(True).items()),
sep="\n",
)
else:
if act == "set":
self.tags_out[cmd[2]] = " ".join(cmd[3:])
print(f"Value of tag {cmd[2]!r} will be changed to: {cmd[3]!r}")
elif act == "drop":
self.tags_out[cmd[2]] = None
print(f"Value of tag {cmd[2]!r} will be cleared.")
elif act == "reset":
if cmd[2] in self.tags_out:
del self.tags_out[cmd[2]]
print(f"Value of tag {cmd[2]!r} will be reset.")
elif act == "show":
print(
*(f"- {k!r}: {v!r}" for k, v in self.tags_final(True).items()),
sep="\n",
)
else:
print("Unknown Operation:", cmd[1])
elif cmd[0] == "raw":
# pprint(single_quote.sub('"', repr(self.data)))
pprint(self.data)
else:
print("Unknown Command:", cmd[0])
TAGS[:] = set(chain(self.tags_in.keys(), self.tags_out.keys()))
def probe(path: Path):
return loads(run(ffp + (str(path),), check=True, stdout=PIPE, stderr=PIPE).stdout)
def convert(inpath: Path, script: List[List[str]] = None):
if not inpath.is_file():
raise FileNotFoundError(inpath)
else:
# print("Reading File")
data = probe(inpath)
get = partial(_get, data)
cset(data)
ff = FFmpeg(data, inpath)
print(
T.red("\nTitle:"),
T.bright_blue_underline(get("format/tags/title", "<unset>")),
)
print(T.red("File:"), T.bright_blue(get("format/filename")))
print(
T.red(" Format:"),
T.bold_bright_cyan(get("format/format_name")),
T.cyan(f"({get('format/format_long_name')})"),
)
print(
T.red(" Duration:"), T.cyan(all_decimals.sub("", get("format/duration")))
)
print(T.red(" Size:"), T.cyan(extra_digits.sub("", get("format/size"))))
print(
T.red("Stream Count:"),
T.bold_cyan(str(get("format/nb_streams", len(get("streams", []))))),
)
for stream in get("streams", []):
print(
T.bold_magenta(
f" Stream {stream['index']}: {stream['codec_type'].title()}"
),
end="",
)
try:
try:
print(
T.magenta(
f", { stream['width']}x{stream['height'] }p"
f", { deval(stream['avg_frame_rate']) }fps"
f", { len(stream.get('chapters', [])) } chapters:"
)
)
except:
print(
T.magenta(
f", { stream['sample_rate'] }"
f", { stream['channels'] } channels:"
)
)
except:
print(T.bold_magenta(":"))
print(
T.red(" Disposition:"),
T.bold_green(
", ".join(
k for k, v in _get(stream, "disposition", {}).items() if v
)
.replace("_", " ")
.title()
or "Not Set"
),
)
try:
print(T.red(" Channels:"), T.bright_magenta(str(stream["channels"])))
except:
pass
print(
T.red(" Codec:"),
T.bold_bright_cyan(stream["codec_name"]),
T.cyan(f"({stream['codec_long_name']})"),
)
print(T.red(" Language:"), T.cyan(repr(_get(stream, "tags/language"))))
ff.include[str(stream["index"])] = True
print(
T.red("Type:"),
T.cyan(
run(("file", str(inpath)), stdout=PIPE, stderr=PIPE)
.stdout.decode()
.split(":", 1)[-1]
.strip()
),
)
ret = 1
while ret != 0:
if script is None:
# Take manual input per-file.
print()
action = input("Action: ")
while action:
# Run a VERY basic REPL to allow for control of Streams.
try:
ff.manipulate(action.split())
except IndexError:
print("Incomplete Command.")
print()
action = input("Action: ")
else:
# Read commands from a file.
for cline in script:
print("\nAction:", *cline)
try:
ff.manipulate(cline)
except IndexError:
print("Incomplete Command.")
cmd = ff.build()
print(T.bold("### EXECUTING:\n "), T.bold_italic_black(" ".join(cmd)))
print("---")
try:
with T.hidden_cursor():
ret = run(cmd, check=True).returncode
except:
print(" \n---")
print(T.bright_red("Process failed. Removing temporary file."))
if ff.tmp.is_file():
ff.tmp.unlink()
if script:
break
else:
print(" \n---")
print(T.green("Process completed. Replacing target file."))
if ff.tmp:
ff.tmp.replace(ff.target)
break
def main():
try:
scr: List[List[str]] = []
for opt, val in opts:
if opt == "--script":
scriptfile = Path(val)
if scriptfile.is_file():
with scriptfile.open("r") as file:
scr.extend(
[word for word in line.strip("\n ").split() if word]
for line in file.readlines()
if line.strip("\n ")
)
total = len(args)
for i, pathstr in enumerate(args, 1):
print(T.bold(f"\n### FILE {i}/{total}"))
try:
convert(Path(pathstr), scr or None)
print(T.bold("\n### FILE PROCESSED"))
except KeyboardInterrupt:
print(T.bold("\n### FILE SKIPPED"))
except EOFError:
print(T.bold("\n### INPUT ENDED"))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment