Last active
October 14, 2023 14:50
-
-
Save carlos-a-g-h/64c83e9fe2384ee0fbb1c562fee2388e to your computer and use it in GitHub Desktop.
FFmpeg python script for some common tasks. Requires FFmpeg installed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3.9 | |
import json | |
import subprocess | |
from datetime import datetime | |
from pathlib import Path | |
# FFToolkit by t.me/CarlosAGH | |
def util_match_language(filestem): | |
code_ok=None | |
# https://en.wikipedia.org/wiki/List_of_ISO_639-2_codes | |
langcodes=[ | |
"eng","fra","ita","por","spa", | |
"chi","jpn","kor","zho", | |
"ara","rus","tur", | |
"cat", | |
] | |
for code in langcodes: | |
if filestem.endswith(f".{code}"): | |
code_ok=code | |
break | |
return code_ok | |
def util_match_format(sfx,video=True,audio=True,subs=True): | |
if video: | |
if sfx in ("avi","flv","m4v","mkv","mp4","mpg","rm","rmvb","webm","wmv"): | |
return True | |
if audio: | |
if sfx in ("aac","ac3","flac","m4a","mka","mp3","ogg","opus","wav","wma"): | |
return True | |
if subs: | |
if sfx in ("ass","srt","ssa","sub"): | |
return True | |
return False | |
def util_timestamp(exact=True): | |
dtobj=datetime.now() | |
text=f"{str(dtobj.month).zfill(2)}-{str(dtobj.month).zfill(2)}-{str(dtobj.day).zfill(2)}" | |
if exact: | |
text=f"{text}-{str(dtobj.hour).zfill(2)}-{str(dtobj.minute).zfill(2)}-{str(dtobj.second).zfill(2)}" | |
return text | |
def util_json_eval(text): | |
try_eval=False | |
try: | |
data=json.loads(text) | |
except: | |
pass | |
else: | |
return data | |
try: | |
assert text.startswith("{") | |
assert text.endswith("}") | |
data=eval(text) | |
except: | |
return {} | |
return data | |
def util_subprocess(line,get_rc=True,get_stdout=True,get_stderr=True): | |
stdout_pipe={True:subprocess.PIPE,False:None}[get_stdout] | |
stderr_pipe={True:subprocess.PIPE,False:None}[get_stderr] | |
print("\nRunning:\n$",line) | |
proc=subprocess.run(line,stdout=stdout_pipe,stderr=stderr_pipe) | |
rc=proc.returncode | |
stdout_ok=None | |
if get_stdout: | |
stdout_ok=proc.stdout.decode() | |
stderr_ok=None | |
if get_stderr: | |
stderr_ok=proc.stderr.decode() | |
results=[] | |
if get_rc: | |
results.append(rc) | |
if get_stdout: | |
results.append(stdout_ok.strip()) | |
if get_stderr: | |
results.append(stderr_ok.strip()) | |
if len(results)==1: | |
return results[0] | |
if len(results)>1: | |
return tuple(results) | |
def util_plcult(thing): | |
if type(thing)==str: | |
return Path(thing) | |
return thing | |
def util_ffline_inputs(fse_list): | |
the_line=[] | |
for fse in fse_list: | |
the_line.extend(["-i",str(fse)]) | |
return the_line | |
def util_ffline_maps(fse_list): | |
index=0 | |
the_line=[] | |
for fse in fse_list: | |
the_line.extend(["-map",f"{index}"]) | |
index=index+1 | |
return the_line | |
def util_ffline_metadata(fse_list_wovideo,sub1): | |
# fse_list_wovideo = list of organized streams without the video stream [1:] | |
# sub1 = the ammount of audio streams before the subtitle streams | |
the_line=[] | |
index_audio=-1 | |
index_subs=-1 | |
pos=-1 | |
maxlen=len(fse_list_wovideo) | |
for fse in fse_list_wovideo: | |
pos=pos+1 | |
lang=util_match_language(fse.stem) | |
if lang==None: | |
continue | |
is_audio=(pos<sub1) | |
if is_audio: | |
index_audio=index_audio+1 | |
if not is_audio: | |
index_subs=index_subs+1 | |
stype,index={True:("a",index_audio),False:("s",index_subs)}[is_audio] | |
the_line.extend([f"-metadata:s:{stype}:{index}",f"language={lang}"]) | |
return the_line | |
def func_unpacker(bin_ffmpeg,bin_ffprobe,path_in,path_out): | |
fse_in=util_plcult(path_in) | |
fse_out=util_plcult(path_out) | |
ffprobe_output=util_subprocess([bin_ffprobe,"-v","warning","-print_format","json","-show_streams",str(fse_in)],get_rc=False,get_stderr=False) | |
ffprobe_json=util_json_eval(ffprobe_output) | |
if not ffprobe_json: | |
print("error: ffprobe failed") | |
return | |
# print(f"ffprobe JSON:\n{json.dumps(ffprobe_json,indent=4)}") | |
str_all=ffprobe_json.get("streams") | |
if not str_all: | |
print("error: ffprobe data not valid") | |
return | |
str_v=[] | |
str_a=[] | |
str_s=[] | |
str_t=[] | |
for s in str_all: | |
c=s.get("codec_type") | |
if not c: | |
continue | |
if c=="video": | |
str_v.append(s) | |
if c=="audio": | |
str_a.append(s) | |
if c=="subtitle": | |
str_s.append(s) | |
if c=="attachment": | |
str_t.append(s) | |
if len(str_v)==0 and len(str_a)==0 and len(str_s)==0 and len(str_t)==0: | |
print("error: No streams ???") | |
return | |
dirname=fse_in.stem | |
fse_out_ok=fse_out.joinpath(dirname) | |
if fse_out_ok.exists(): | |
fse_out_ok=fse_out.joinpath(f"{util_timestamp()} {dirname}") | |
fse_out_ok.mkdir(exist_ok=True,parents=True) | |
print(f"\nFrom file: {fse_in.name}\nTo directory: {fse_out_ok.name}") | |
index=0 | |
for v in str_v: | |
codec=v.get("codec_name") | |
sfx=".mkv" | |
if codec: | |
if codec in ("mpeg4","mpeg"): | |
sfx=".avi" | |
filename=f"video_{index}{sfx}" | |
rc,errors=util_subprocess([bin_ffmpeg,"-y","-v","warning","-probesize","16M","-i",str(fse_in),"-map",f"0:v:{index}","-c","copy",str(fse_out_ok.joinpath(filename))],get_stdout=False) | |
print({ | |
True:f"OK: {filename}", | |
False:f"ERR: {filename}\n{errors}" | |
}[rc==0]) | |
index=index+1 | |
index=0 | |
for a in str_a: | |
codec=a.get("codec_name") | |
if not codec: | |
continue | |
sfx=".mka" | |
if codec in ("mp3","flac"): | |
sfx=f".{codec}" | |
if codec in ("aac","ac3"): | |
sfx=".m4a" | |
if codec in ("opus","vp9","vp8"): | |
sfx=".ogg" | |
if codec=="pcm": | |
sfx=".wav" | |
if codec=="wma": | |
sfx=".wma" | |
extra="" | |
tags=a.get("tags") | |
if tags: | |
language=tags.get("language") | |
if language: | |
extra=f"{extra}.{language}" | |
filename=f"audio_{index}{extra}{sfx}" | |
rc,errors=util_subprocess([bin_ffmpeg,"-y","-v","warning","-probesize","16M","-i",str(fse_in),"-map",f"0:a:{index}","-c","copy",str(fse_out_ok.joinpath(filename))],get_stdout=False) | |
print({ | |
True:f"OK: {filename}", | |
False:f"ERR: {filename}\n{errors}" | |
}[rc==0]) | |
index=index+1 | |
index=0 | |
for a in str_s: | |
codec=s.get("codec_name") | |
if not codec: | |
continue | |
sfx=".ass" | |
if "srt" in codec: | |
sfx=".srt" | |
if "ssa" in codec: | |
sfx=".ssa" | |
if "ass" in codec: | |
sfx=".ass" | |
extra="" | |
tags=a.get("tags") | |
if tags: | |
language=tags.get("language") | |
if language: | |
extra=f"{extra}.{language}" | |
filename=f"sub_{index}{extra}{sfx}" | |
rc,errors=util_subprocess([bin_ffmpeg,"-y","-v","warning","-probesize","16M","-i",str(fse_in),"-map",f"0:s:{index}",str(fse_out_ok.joinpath(filename))],get_stdout=False) | |
print({ | |
True:f"OK: {filename}", | |
False:f"ERR: {filename}\n{errors}" | |
}[rc==0]) | |
index=index+1 | |
index=0 | |
for t in str_t: | |
filename=f"attachment_{index}" | |
tags=s.get("tags") | |
if tags: | |
doubt=tags.get("filename") | |
if doubt: | |
filename=fn_doubt | |
rc,errors=util_subprocess([bin_ffmpeg,"-y","-v","warning","-probesize","16M","-i",str(fse_in),"-map",f"0:s:{index}","-c","copy",str(fse_out_ok.joinpath(filename))],get_stdout=False) | |
print({ | |
True:f"OK: {filename}", | |
False:f"ERR: {filename}\n\nstderr:\n{errors}\n" | |
}[rc==0]) | |
def func_builder(bin_ffmpeg,ipath,opath): | |
fse_in=util_plcult(ipath) | |
fse_out=util_plcult(opath) | |
fse_list=list(fse_in.iterdir()) | |
fse_list.sort() | |
video_src=None | |
audio_src=[] | |
subs_src=[] | |
for fse in fse_list: | |
if not fse.is_file(): | |
continue | |
sfx=fse.suffix.lower()[1:] | |
if video_src==None: | |
if util_match_format(sfx,audio=False,subs=False): | |
video_src=fse | |
continue | |
if util_match_format(sfx,video=False,subs=False): | |
audio_src.append(fse) | |
continue | |
if util_match_format(sfx,video=False,audio=False): | |
subs_src.append(fse) | |
if not video_src: | |
print("Video source not found!") | |
return | |
if len(audio_src)==0 and len(subs_src)==0: | |
print("There are no audio sources, there are no subtitles sources, there is nothing to do") | |
return | |
filename=f"{fse_in.stem}.mkv" | |
fse_out_ok=fse_out.joinpath(filename) | |
fse_out.mkdir(exist_ok=True,parents=True) | |
if fse_out_ok.exists(): | |
fse_out_ok=fse_out.joinpath(f"{util_timestamp()} {filename}") | |
inputs_list=[video_src] | |
inputs_list.extend(audio_src) | |
inputs_list.extend(subs_src) | |
inputs=len(inputs_list) | |
cmd_line=[bin_ffmpeg,"-y","-v","warning"] | |
cmd_line.extend(util_ffline_inputs(inputs_list)) | |
cmd_line.extend(util_ffline_maps(inputs_list)) | |
cmd_line.extend(["-c:v","copy"]) | |
if len(audio_src)>0: | |
cmd_line.extend(["-c:a","copy"]) | |
cmd_line.extend(util_ffline_metadata(inputs_list[1:],len(audio_src))) | |
cmd_line.append(str(fse_out_ok)) | |
msg=f"\nFrom directory: {fse_in.name}\n\n\tDetected files (streams):" | |
for fse in inputs_list: | |
msg=f"{msg}\n\t\t{fse.name}" | |
msg=f"{msg}\n\nOutput file: {fse_out_ok.name}" | |
print(msg) | |
rc,errors=util_subprocess(cmd_line,get_stdout=False) | |
print({ | |
True:f"OK: {fse_out.name}", | |
False:f"ERR: {fse_out.name}\n{errors}" | |
}[rc==0]) | |
def func_encoder(bin_ffmpeg,path_in,path_out): | |
fse_in=util_plcult(path_in) | |
fse_out=util_plcult(path_out) | |
filename=f"{fse_in.stem}.mkv" | |
fse_out_ok=fse_out.joinpath(filename) | |
fse_out.mkdir(exist_ok=True,parents=True) | |
if fse_out_ok.exists(): | |
fse_out_ok=fse_out.joinpath(f"{util_timestamp()} {filename}") | |
ffmpeg_line=[ | |
bin_ffmpeg,"-i",str(fse_in), | |
"-map","0:v", | |
"-map","0:a?", | |
"-map","0:s?", | |
"-map_chapters","0", | |
"-map_metadata","0", | |
"-c:v","libx265", | |
"-preset","slow", | |
"-pix_fmt","yuv420p10le", | |
"-crf","26", | |
"-framerate","24", | |
"-tune","animation", | |
"-c:a","aac", | |
"-b:a","128k", | |
"-c:s","copy", | |
str(fse_out_ok), | |
] | |
print(f"\nFrom file: {fse_in.name}\nTo file: {fse_out_ok.name}") | |
results=util_subprocess(ffmpeg_line) | |
rc=results[0] | |
errors=results[2] | |
print({ | |
True:f"OK: {filename}", | |
False:f"ERR: {filename}\n{errors}\n" | |
}[rc==0]) | |
def func_appender(bin_ffmpeg,path_in,path_out): | |
fse_in=util_plcult(path_in) | |
fse_out=util_plcult(path_out) | |
fse_list_audio=[] | |
fse_list_subs=[] | |
fse_list_dir=list(fse_in.parent.iterdir()) | |
fse_list_dir.sort() | |
for fse in fse_list_dir: | |
if not fse.is_file(): | |
continue | |
if fse.name==fse_in.name: | |
continue | |
if not fse.name.startswith(f"{fse_in.stem}."): | |
continue | |
sfx=fse.suffix.lower()[1:] | |
if util_match_format(sfx,video=False,subs=False): | |
fse_list_audio.append(fse) | |
if util_match_format(sfx,video=False,audio=False): | |
fse_list_subs.append(fse) | |
if len(fse_list_audio)==0 and len(fse_list_subs)==0: | |
print("error: nothing to do") | |
return | |
filename=f"{fse_in.stem}.mkv" | |
fse_out_ok=fse_out.joinpath(filename) | |
fse_out.mkdir(exist_ok=True,parents=True) | |
if fse_out_ok.exists(): | |
fse_out_ok=fse_out.joinpath(f"{util_timestamp()} {filename}") | |
msg=f"\nFrom file (main video stream): {fse_in.name}" | |
if len(fse_list_audio)>0: | |
msg=f"{msg}\n\nAudio stream(s):" | |
for fse in fse_list_audio: | |
msg=f"{msg}\n\t{fse.name}" | |
if len(fse_list_subs)>0: | |
msg=f"{msg}\n\nSubtitle stream(s):" | |
for fse in fse_list_subs: | |
msg=f"{msg}\n\t{fse.name}" | |
msg=f"{msg}\n\nTo file: {fse_out_ok.name}" | |
print(msg) | |
cmd_line=[bin_ffmpeg,"-y","-v","warning","-probesize","16M"] | |
fse_list_allstreams=[fse_in] | |
if len(fse_list_audio)>0: | |
fse_list_allstreams.extend(fse_list_audio) | |
if len(fse_list_subs)>0: | |
fse_list_allstreams.extend(fse_list_subs) | |
cmd_line.extend(util_ffline_inputs(fse_list_allstreams)) | |
cmd_line.extend(util_ffline_maps(fse_list_allstreams)) | |
cmd_line.extend(["-c:v","copy"]) | |
if len(fse_list_audio)>0: | |
cmd_line.extend(["-c:a","copy"]) | |
cmd_line.extend(util_ffline_metadata(fse_list_allstreams[1:],len(fse_list_audio))) | |
cmd_line.append(str(fse_out_ok)) | |
rc,errors=util_subprocess(cmd_line,get_stdout=False) | |
print({ | |
True:f"OK: {fse_out_ok.name}", | |
False:f"ERR: {fse_out_ok.name}\n{errors}\n" | |
}[rc==0]) | |
if __name__=="__main__": | |
import sys | |
bin_ffmpeg="ffmpeg" | |
bin_ffprobe="ffprobe" | |
# The slash from Windows does not work, but UNIX slash works | |
# bin_ffmpeg="D:/path/to/ffmpeg/bin/ffmpeg.exe" | |
# bin_ffprobe="D:/path/to/ffmpeg/bin/ffprobe.exe" | |
commands=("unpack","append","encode","build","build-max") | |
if len(sys.argv)==1: | |
msg="\nUsage: [Command] [Path]\n\nCommands:" | |
for cmd in commands: | |
msg=f"{msg} {cmd}," | |
msg=f"{msg[:-1]}\n\nYou can read the help of each command\n" | |
print(msg) | |
sys.exit(0) | |
cmd=sys.argv[1].lower().strip() | |
if not cmd in commands: | |
print("error: Command not found") | |
sys.exit(1) | |
target_raw="" | |
if len(sys.argv)==2: | |
msg={ | |
"encode":"Requires a path to a video file or a directory, every video file found (non-recursively, of course) will be processed\n\nTranscodes a video file\nThe ffmpeg parameters for this are hardcoded right now", | |
"unpack":"Requires a path to a video file or a directory, every video file found (non-recursively, of course) will be processed\n\nExtracts all video, audio, subtitle streams and attachments from a media file\nBe aware that most of the metadata such as MKV chapters, nice names, etc... are lost in the process but the language is at least preserved in the filenames of the audio and subtitle files", | |
"append":"Requires a path to a video file or a directory, every video file found (non-recursively, of course) will be processed\n\nSome aditional files in the same directory as the main file will be selected, these files must match the name of the main file and they can be audio files and subtitle files, the result is an MKV file\nIf a main file has nothing to be added, the command will not process that file, because there is nothing to add", | |
"build":"Requires a directory; if you want to do batch processing, use 'build-max' instead\n\nFrom a given directory, a single video will be selected as the video stream, all audio files will be selected as audio streams, and the same with the subtitles, the result is an MKV file", | |
"build-max":"Requires a directory that contains directories that can be processed by the 'build' command\n\nBatch processing version for 'build'", | |
}[cmd] | |
print(f"\n{cmd}:\n\n{msg}\n") | |
sys.exit(0) | |
if len(sys.argv)>2: | |
for part in sys.argv[2:]: | |
target_raw=f"{target_raw} {part}" | |
target_fse=Path(target_raw.strip()) | |
if not target_fse.exists(): | |
print("error: path does not exist") | |
sys.exit(1) | |
################################################################################ | |
print(f"\nCommand: {cmd}\n\nSelected path:\n{target_fse}") | |
targets=[] | |
if cmd=="build": | |
if target_fse.is_dir(): | |
targets.append(target_fse) | |
if cmd in ("unpack","encode","append","build-max"): | |
if not cmd=="build-max": | |
if target_fse.is_file(): | |
targets.append(target_fse) | |
if target_fse.is_dir(): | |
targets_tmp=list(target_fse.iterdir()) | |
targets_tmp.sort() | |
for fse in targets_tmp: | |
if cmd=="build-max": | |
if not fse.is_dir(): | |
continue | |
if not cmd=="build-max": | |
if not fse.is_file(): | |
continue | |
if not util_match_format(fse.suffix.lower()[1:],audio=False,subs=False): | |
continue | |
targets.append(fse) | |
if len(targets)==0: | |
print("error: There are no targets") | |
sys.exit(1) | |
msg="\nTarget(s):" | |
if len(targets)==1: | |
msg=f"{msg} {targets[0].name}" | |
if len(targets)>1: | |
for fse in targets: | |
msg=f"{msg}\n\t{fse.name}" | |
print(msg) | |
################################################################################ | |
dt_now=util_timestamp() | |
outdir_prefix={ | |
"unpack":"Unpacker", | |
"encode":"Encoder", | |
"append":"Appender", | |
"build":"Builder", | |
"build-max":"BuilderMax" | |
}[cmd] | |
outdir=target_fse.parent.joinpath(f"{outdir_prefix}.{dt_now}") | |
for fse in targets: | |
if cmd=="unpack": | |
func_unpacker(bin_ffmpeg,bin_ffprobe,fse,outdir) | |
if cmd=="encode": | |
func_encoder(bin_ffmpeg,fse,outdir) | |
if cmd=="append": | |
func_appender(bin_ffmpeg,fse,outdir) | |
if cmd in ("build","build-max"): | |
func_builder(bin_ffmpeg,fse,outdir) | |
print(f"\nOutput directory:\n{outdir}\n") | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment