Last active
September 8, 2019 08:55
-
-
Save Yaulendil/1327391a5e0d083936c0a1532c153f5a to your computer and use it in GitHub Desktop.
FFedit: A prompt-based Python Wrapper around FFmpeg.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""A semi-interactive tool to manipulate the contents of a video file using | |
FFprobe and FFmpeg. | |
Provide a file and it will be analyzed with FFprobe. A prompt will then be | |
provided to allow interactive renaming of streams in the file. An empty | |
Input to the prompt will run the FFmpeg Command and move on to the next | |
File. | |
Created because I am too dumb to memorize all the options FFmpeg has. | |
""" | |
# print("Loading Imports") | |
from functools import partial | |
from getopt import getopt | |
from itertools import accumulate, chain | |
from json import loads | |
from pathlib import Path | |
from pprint import pprint | |
from re import compile | |
from readline import ( | |
get_line_buffer, | |
parse_and_bind, | |
set_completer, | |
set_completer_delims, | |
) | |
from subprocess import PIPE, run | |
from sys import argv | |
from typing import Iterable, List, Union | |
from blessings import Terminal | |
# from fuzzyfinder.main import fuzzyfinder | |
# print("Building Constants") | |
opts, args = getopt(argv[1:], "", ["script="]) | |
T = Terminal() | |
ffp = ( | |
"ffprobe", | |
"-hide_banner", | |
"-print_format", | |
"json", | |
"-show_format", | |
"-show_streams", | |
"-show_error", | |
"-show_programs", | |
"-show_chapters", | |
"-pretty", | |
) | |
all_decimals = compile(r"(?<=\d)\.\d+") | |
extra_digits = compile(r"(?<=\d\.\d\d)\d+") | |
single_quote = compile(r"(?<!\\)'") | |
target_prefix = ("pal-", "ntsc-", "film-", "") | |
target_type = ("vcd", "svcd", "dvd", "dv", "dv50") | |
all_targets = [f"{pre}{targ}" for pre in target_prefix for targ in target_type] | |
all_formats = list( | |
filter( | |
compile(r"^\s\S\S\s(?=[^=])").match, | |
run(("ffprobe", "-hide_banner", "-formats"), stdout=PIPE, stderr=PIPE) | |
.stdout.decode() | |
.split("\n"), | |
) | |
) | |
format_match = lambda s: { | |
k: v | |
for k, v in sorted( | |
( | |
tuple(map(str.strip, pair[4:].split(maxsplit=1))) | |
for pair in all_formats | |
if compile(r"^\s" + s).match(pair) | |
), | |
key=(lambda o: o[0]), | |
) | |
} | |
can_mux = format_match(".E") | |
can_demux = format_match("D.") | |
all_codecs = list( | |
filter( | |
compile(r"^\s\S\S\S\S\S\S\s(?=[^=])").match, | |
run(("ffprobe", "-hide_banner", "-codecs"), stdout=PIPE, stderr=PIPE) | |
.stdout.decode() | |
.split("\n"), | |
) | |
) | |
codec_match = lambda s: { | |
k: v | |
for k, v in sorted( | |
( | |
tuple(map(str.strip, pair[8:].split(maxsplit=1))) | |
for pair in all_codecs | |
if compile(r"^\s" + s).match(pair) | |
), | |
key=(lambda o: o[0]), | |
) | |
} | |
can_decode = codec_match(r"D.....") | |
can_encode = codec_match(r".E....") | |
codec_video = codec_match(r"..V...") | |
codec_audio = codec_match(r"..A...") | |
codec_subs = codec_match(r"..S...") | |
codecs = {"video": codec_video, "audio": codec_audio, "subs": codec_subs} | |
dispositions = [ | |
"unset", # INTERNAL. Used to specify that the Disposition be "set" to 0. | |
"default", | |
"dub", | |
"original", | |
"comment", | |
"lyrics", | |
"karaoke", | |
"forced", | |
"hearing_impaired", | |
"visual_impaired", | |
"clean_effects", | |
"attached_pic", | |
"captions", | |
"descriptions", | |
"dependent", | |
"metadata", | |
] | |
TAGS = [] | |
all_commands = { | |
"help": True, | |
"check": True, | |
"include": {}, | |
"exclude": {}, | |
"encode": codecs, | |
"format": can_mux, | |
"target": dict.fromkeys(all_targets, True), | |
"fps": True, | |
"size": True, | |
"title": True, | |
"tags": {"set": TAGS, "drop": TAGS, "reset": TAGS, "show": True}, | |
"disposition": {}, | |
} | |
# print("Preparing Program") | |
def deval(s: str): | |
return round(list(accumulate(map(int, s.split("/")), (lambda a, b: a / b)))[-1], 3) | |
def _get(struct: dict, route: Union[Iterable[str], str], default=None): | |
here = struct | |
if isinstance(route, str): | |
route = route.split("/") | |
try: | |
for step in filter(None, route): | |
here = here[step] | |
except: | |
return default | |
else: | |
return here | |
def complete(text: str, i: int): | |
try: | |
buf = get_line_buffer() | |
if buf.strip(): | |
words = buf.split(" ") | |
words.pop() | |
penultimate = _get(all_commands, words, all_commands) | |
if isinstance(penultimate, (dict, list)): | |
if text: | |
# return list(fuzzyfinder(text, map(str, penultimate.keys())))[i] | |
return str( | |
sorted( | |
list( | |
s | |
for s in penultimate | |
if s.lower().startswith(text.lower()) | |
) | |
)[i] + " " | |
) | |
else: | |
return str(list(penultimate)[i]) + " " | |
else: | |
return None | |
else: | |
return list(all_commands)[i] + " " | |
except (IndexError, KeyError): | |
return None | |
parse_and_bind("tab: complete") | |
set_completer(complete) | |
set_completer_delims(" \t\n;") | |
def cset(data): | |
streams = list(map(str, range(len(_get(data, "streams"))))) | |
all_commands["include"] = all_commands["exclude"] = dict.fromkeys(streams, True) | |
all_commands["disposition"] = dict.fromkeys(streams, dispositions) | |
class FFmpeg(object): | |
def __init__(self, data, path: Path): | |
self.data = data | |
self.path: Path = path | |
self.target: Path = self.path | |
self.initial = ["ffmpeg", "-loglevel", "error", "-stats"] | |
self.general = [] | |
self.include = {} | |
self.codecs = {} | |
self.disp = {} # Disposition settings. | |
self.fmt_target = None # "Target" format, like "DVD", "DV50", etc. | |
self.fps = None | |
self.size = None | |
self.tags_in = self.data.get("format", {}).get("tags", {}) | |
self.tags_out = {} | |
TAGS[:] = list(self.tags_in) | |
self.types = { | |
i: d["codec_type"] for i, d in enumerate(self.data.get("streams", [])) | |
} | |
def relative(self, stream: int) -> str: | |
"""Return the Stream Identifier in relative terms. | |
For example, if the fifth Stream is the second Audio Stream, the result | |
of `.relative(5)` would be `"a:1"`. This format is required for | |
setting Disposition. | |
""" | |
stype = self.types[stream] | |
same_type = [k for k, v in self.types.items() if v == stype] | |
return "{}:{}".format(stype[0], same_type.index(stream)) | |
def tags_final(self, show_all: bool = False): | |
tfin = self.tags_in.copy() | |
tfin.update(self.tags_out) | |
return { | |
k_out: v_out | |
for k_out, v_out in tfin.items() | |
if show_all or v_out != self.tags_in.get(k_out) | |
} | |
@property | |
def tmp(self) -> Path: | |
return self.target.with_name(f".tmp.{self.target.name}") | |
def build(self) -> List[str]: | |
"""Create the FFmpeg Command that will do what this Instance demands.""" | |
tags = tuple( | |
("-metadata", (f"{k}={v}" if v is not None else f"{k}=")) | |
for k, v in self.tags_final().items() | |
) | |
return list( | |
map( | |
str, | |
chain( | |
self.initial, # Strings defining FFmpeg. | |
self.general, # Options defining overall behavior. | |
# Options applying to the Input File: | |
# (none) | |
("-i", self.path), # Input File. | |
# Options applying to the Output File: | |
*( # Codec: | |
((f"-c:{t[0]}", c) for t, c in self.codecs.items()) | |
if self.codecs | |
else ((("-c", "copy"),) if not self.fmt_target else ()) | |
), | |
("-r", self.fps) if self.fps else (), # Framerate. | |
("-s", self.size) if self.size else (), # Resolution. | |
*( # Disposition: | |
# ("-disposition:{}".format(self.relative(num)), setting) | |
(f"-disposition:{num}", setting) | |
for num, setting in self.disp.items() | |
), | |
chain(*tags) if tags else (), | |
*( # Stream Selection: | |
(("-map", "0"),) | |
if all(self.include.values()) | |
else (("-map", f"0:{i}") for i, s in self.include.items() if s) | |
), | |
("-target", self.fmt_target) if self.fmt_target else (), | |
("-strict", "-2"), # Allow for Experimental Codecs. | |
(self.tmp,), | |
), | |
) | |
) | |
def manipulate(self, cmd: List[str]): | |
if cmd[0] == "help" or cmd[0] == "?": | |
print(" ", "\n ".join(all_commands.keys())) | |
elif cmd[0] == "check": | |
print( | |
f"Command Preview:" | |
f"\n {' '.join(self.build())}" | |
f"\n && mv -f {self.tmp} {self.target}" | |
f"\n || rm {self.tmp}" | |
) | |
elif cmd[0] == "include": | |
if cmd[1] in self.include: | |
self.include[cmd[1]] = True | |
print(f"Stream 0:{cmd[1]} will be included.") | |
elif cmd[0] == "exclude": | |
if cmd[1] in self.include: | |
self.include[cmd[1]] = False | |
print(f"Stream 0:{cmd[1]} will NOT be included.") | |
elif cmd[0] == "disposition": | |
stream = int(cmd[1]) | |
disp = cmd[2] | |
if disp == "unset": | |
self.disp[stream] = "0" | |
print("Unsetting Disposition.") | |
elif disp in dispositions: | |
self.disp[stream] = disp | |
print(f"Setting Disposition to {disp.replace('_', ' ').title()}.") | |
else: | |
print(f"Unknown Disposition.") | |
elif cmd[0] == "encode": | |
if cmd[1] in codecs: # Valid type (video, audio, or subs). | |
if cmd[2] == "none": # Unset encoder. | |
if cmd[1] in self.codecs: | |
del self.codecs[cmd[1]] | |
print(f"Codec unset for {cmd[1]}.") | |
elif cmd[2] == "copy": | |
self.codecs[cmd[1]] = "copy" | |
print(f"Codec for {cmd[1]} will be copied from input.") | |
elif cmd[2] in can_encode: # Encoder present. | |
# else: | |
if cmd[2] in codecs[cmd[1]]: # Codec matches type. | |
self.codecs[cmd[1]] = cmd[2] | |
print( | |
f"Will encode {cmd[1]} with codec: {codecs[cmd[1]][cmd[2]]}" | |
) | |
else: | |
print(f"Codec {cmd[2]!r} is not a codec for {cmd[1]}.") | |
else: | |
print(f"Encoder for {cmd[2]!r} not found.") | |
# | |
else: | |
print(f"Unknown stream type {cmd[1]!r}.") | |
elif cmd[0] == "format": | |
try: | |
self.target = self.target.with_suffix(f".{cmd[1]}") | |
print(f"File Extension set: .{cmd[1]}") | |
except: | |
print("Invalid Extension.") | |
elif cmd[0] == "target": | |
if cmd[1] in all_targets: | |
self.fmt_target = cmd[1] | |
else: | |
print("Invalid Target.") | |
elif cmd[0] == "fps": | |
if cmd[1].isdigit() or cmd[1] == "copy": | |
self.fps = cmd[1] | |
print( | |
"FPS will be copied from input." | |
if cmd[1] == "copy" | |
else f"Framerate will be {cmd[1]}fps." | |
) | |
else: | |
print("Invalid Framerate.") | |
elif cmd[0] == "size": | |
self.size = cmd[1] | |
print(f"Output will be rescaled to {cmd[1]}.") | |
elif cmd[0] == "title": | |
title = " ".join(cmd[1:]).format(_get(self.data, "format/tags/title")) | |
self.tags_out["title"] = title | |
print(f"Title will be set to: {title!r}") | |
elif cmd[0] == "tags": | |
try: | |
act = cmd[1] | |
except IndexError: | |
print( | |
*(f"- {k!r}: {v!r}" for k, v in self.tags_final(True).items()), | |
sep="\n", | |
) | |
else: | |
if act == "set": | |
self.tags_out[cmd[2]] = " ".join(cmd[3:]) | |
print(f"Value of tag {cmd[2]!r} will be changed to: {cmd[3]!r}") | |
elif act == "drop": | |
self.tags_out[cmd[2]] = None | |
print(f"Value of tag {cmd[2]!r} will be cleared.") | |
elif act == "reset": | |
if cmd[2] in self.tags_out: | |
del self.tags_out[cmd[2]] | |
print(f"Value of tag {cmd[2]!r} will be reset.") | |
elif act == "show": | |
print( | |
*(f"- {k!r}: {v!r}" for k, v in self.tags_final(True).items()), | |
sep="\n", | |
) | |
else: | |
print("Unknown Operation:", cmd[1]) | |
elif cmd[0] == "raw": | |
# pprint(single_quote.sub('"', repr(self.data))) | |
pprint(self.data) | |
else: | |
print("Unknown Command:", cmd[0]) | |
TAGS[:] = set(chain(self.tags_in.keys(), self.tags_out.keys())) | |
def probe(path: Path): | |
return loads(run(ffp + (str(path),), check=True, stdout=PIPE, stderr=PIPE).stdout) | |
def convert(inpath: Path, script: List[List[str]] = None): | |
if not inpath.is_file(): | |
raise FileNotFoundError(inpath) | |
else: | |
# print("Reading File") | |
data = probe(inpath) | |
get = partial(_get, data) | |
cset(data) | |
ff = FFmpeg(data, inpath) | |
print( | |
T.red("\nTitle:"), | |
T.bright_blue_underline(get("format/tags/title", "<unset>")), | |
) | |
print(T.red("File:"), T.bright_blue(get("format/filename"))) | |
print( | |
T.red(" Format:"), | |
T.bold_bright_cyan(get("format/format_name")), | |
T.cyan(f"({get('format/format_long_name')})"), | |
) | |
print( | |
T.red(" Duration:"), T.cyan(all_decimals.sub("", get("format/duration"))) | |
) | |
print(T.red(" Size:"), T.cyan(extra_digits.sub("", get("format/size")))) | |
print( | |
T.red("Stream Count:"), | |
T.bold_cyan(str(get("format/nb_streams", len(get("streams", []))))), | |
) | |
for stream in get("streams", []): | |
print( | |
T.bold_magenta( | |
f" Stream {stream['index']}: {stream['codec_type'].title()}" | |
), | |
end="", | |
) | |
try: | |
try: | |
print( | |
T.magenta( | |
f", { stream['width']}x{stream['height'] }p" | |
f", { deval(stream['avg_frame_rate']) }fps" | |
f", { len(stream.get('chapters', [])) } chapters:" | |
) | |
) | |
except: | |
print( | |
T.magenta( | |
f", { stream['sample_rate'] }" | |
f", { stream['channels'] } channels:" | |
) | |
) | |
except: | |
print(T.bold_magenta(":")) | |
print( | |
T.red(" Disposition:"), | |
T.bold_green( | |
", ".join( | |
k for k, v in _get(stream, "disposition", {}).items() if v | |
) | |
.replace("_", " ") | |
.title() | |
or "Not Set" | |
), | |
) | |
try: | |
print(T.red(" Channels:"), T.bright_magenta(str(stream["channels"]))) | |
except: | |
pass | |
print( | |
T.red(" Codec:"), | |
T.bold_bright_cyan(stream["codec_name"]), | |
T.cyan(f"({stream['codec_long_name']})"), | |
) | |
print(T.red(" Language:"), T.cyan(repr(_get(stream, "tags/language")))) | |
ff.include[str(stream["index"])] = True | |
print( | |
T.red("Type:"), | |
T.cyan( | |
run(("file", str(inpath)), stdout=PIPE, stderr=PIPE) | |
.stdout.decode() | |
.split(":", 1)[-1] | |
.strip() | |
), | |
) | |
ret = 1 | |
while ret != 0: | |
if script is None: | |
# Take manual input per-file. | |
print() | |
action = input("Action: ") | |
while action: | |
# Run a VERY basic REPL to allow for control of Streams. | |
try: | |
ff.manipulate(action.split()) | |
except IndexError: | |
print("Incomplete Command.") | |
print() | |
action = input("Action: ") | |
else: | |
# Read commands from a file. | |
for cline in script: | |
print("\nAction:", *cline) | |
try: | |
ff.manipulate(cline) | |
except IndexError: | |
print("Incomplete Command.") | |
cmd = ff.build() | |
print(T.bold("### EXECUTING:\n "), T.bold_italic_black(" ".join(cmd))) | |
print("---") | |
try: | |
with T.hidden_cursor(): | |
ret = run(cmd, check=True).returncode | |
except: | |
print(" \n---") | |
print(T.bright_red("Process failed. Removing temporary file.")) | |
if ff.tmp.is_file(): | |
ff.tmp.unlink() | |
if script: | |
break | |
else: | |
print(" \n---") | |
print(T.green("Process completed. Replacing target file.")) | |
if ff.tmp: | |
ff.tmp.replace(ff.target) | |
break | |
def main(): | |
try: | |
scr: List[List[str]] = [] | |
for opt, val in opts: | |
if opt == "--script": | |
scriptfile = Path(val) | |
if scriptfile.is_file(): | |
with scriptfile.open("r") as file: | |
scr.extend( | |
[word for word in line.strip("\n ").split() if word] | |
for line in file.readlines() | |
if line.strip("\n ") | |
) | |
total = len(args) | |
for i, pathstr in enumerate(args, 1): | |
print(T.bold(f"\n### FILE {i}/{total}")) | |
try: | |
convert(Path(pathstr), scr or None) | |
print(T.bold("\n### FILE PROCESSED")) | |
except KeyboardInterrupt: | |
print(T.bold("\n### FILE SKIPPED")) | |
except EOFError: | |
print(T.bold("\n### INPUT ENDED")) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment