Skip to content

Instantly share code, notes, and snippets.

@GiovanH
Created January 4, 2023 23:35
Show Gist options
  • Save GiovanH/8600a26cbb5172d6e42b57b84665e4ee to your computer and use it in GitHub Desktop.
Save GiovanH/8600a26cbb5172d6e42b57b84665e4ee to your computer and use it in GitHub Desktop.
Converts extracted audio and video assets from the game Not For Broadcast into usable MKV files
import glob
import os
import subprocess
import parallel_threads
import itertools
import pprint
def getVideoChapter(mp4):
path, filename = os.path.split(mp4)
filep, ext = os.path.splitext(filename)
# if filep.startswith('0L-03-02'):
# return filep[:9]
if any(filep.startswith(p) for p in ["Ad", "Coming Up", "Disrupt Hack"]):
return filep
key = filep.split(" ")[0]
return key
def getAudio(mp4):
path, filename = os.path.split(mp4)
filep, ext = os.path.splitext(filename)
g = f"AudioClip/{filep}*"
# print(path, filename, g)
ret = glob.glob(g)
try:
assert len(ret) == 1
return ret[0]
except AssertionError:
g = f"AudioClip/{filep} Audio.wav"
print(path, filename, g)
ret = glob.glob(g)
assert len(ret) == 1
return ret[0]
def writeMerged(video):
path, filename = os.path.split(video)
outpath = os.path.join("merged", filename + '.mkv')
if not os.path.exists(outpath):
audio = getAudio(video)
subprocess.run(['ffmpeg', '-i', video, '-i', audio, '-c', 'copy', outpath])
def dedupe(l):
# Deduplicate list in-place
res = []
for i in l:
if i not in res:
res.append(i)
return res
def flatlist(nested):
return list(itertools.chain(*nested))
def writeMergedGroup(key, group, ex=[], clobber=False):
av = [(getAudio(video), video) for video in group]
audio_tracks = dedupe([audio for audio, video in av])
outpath = os.path.join("merged", key + '.mkv')
# Special cases to make headlines nicer
(v01_pos, v23_pos) = ("bottom", "top") if key.startswith("H") else ("top", "bottom")
video_filter = {
1: ['-filter_complex', 'null[composite]'],
2: ['-filter_complex',
"[0:v][1:v]hstack=inputs=2[composite]"],
3: ['-filter_complex',
"[0:v][1:v]hstack=inputs=2[top];"
"[2:v]pad=width=2*iw:height=ih:x=0:y=0[bottom];"
"[top][bottom]vstack=inputs=2[composite]"],
4: ['-filter_complex',
"[0:v][1:v]hstack=inputs=2[top];"
"[2:v][3:v]hstack=inputs=2[bottom];"
"[top][bottom]vstack=inputs=2[composite]"],
8: ['-filter_complex',
"[0:v][1:v][2:v][3:v]hstack=inputs=4[top];"
"[4:v][5:v][6:v][7:v]hstack=inputs=4[bottom];"
"[top][bottom]vstack=inputs=2[composite]"]
}
vl = len(av)
audio_filter = {
1: [], # Composite only
2: ['-filter_complex',
f"[{vl + 0}:a][{vl + 1}:a]amerge[acomposite]"], # Stereo
3: ['-filter_complex',
f"[{vl + 0}:a][{vl + 1}:a][{vl + 2}:a]amix=inputs=3[acomposite]"],
4: ['-filter_complex',
f"[{vl + 0}:a][{vl + 1}:a][{vl + 2}:a][{vl + 3}:a]amix=inputs=4[acomposite]"],
8: [], # Composite only
}
# Special cases to make headlines nicer
if key.startswith("H"):
video_filter[3] = [ # Gap for #2 feed
'-filter_complex',
"[0:v]pad=width=2*iw:height=ih:x=0:y=0[top];"
"[1:v][2:v]hstack=inputs=2[bottom];"
"[top][bottom]vstack=inputs=2[composite]"
]
audio_filter[3] = [
'-filter_complex',
f"[{vl + 0}:a][{vl + 1}:a]amerge[stereo];" # Mix headlines into right/left stereo
f"[stereo][{vl + 2}:a]amix=inputs=2[acomposite]" # Mix stereo mix on top of main feed
]
audio_filter[4] = [
'-filter_complex',
f"[{vl + 0}:a][{vl + 1}:a]amerge[stereo];" # Mix headlines into right/left stereo
f"[stereo][{vl + 2}:a][{vl + 3}:a]amix=inputs=3[acomposite]" # Mix stereo mix on top of main feed
]
multi_audio = len(audio_filter[len(audio_tracks)]) > 1
# If multi_audio, there is a composite track at the front, and indices need to be increased by one.
multi_audio_offset = 1 if multi_audio else 0
command = (
['ffmpeg'] +
['-hwaccel', 'cuda'] +
# ['-hwaccel_output_format', 'cuda'] +
# Videos, composited based on the video count
flatlist([
['-i', video]
for audio, video in av
]) +
video_filter[len(av)] +
# Audio tracks, composited based on audio count
flatlist([
['-i', audio] for audio in audio_tracks
]) +
audio_filter[len(audio_tracks)] +
ex +
# Output composite video
['-map', "[composite]"] +
[f'-metadata:s:v:0', f'title=Composite'] +
# Output composite and individual audio
(['-map', "[acomposite]", f'-metadata:s:a:0', 'title=Composite'] if multi_audio else []) +
flatlist([
['-map', f'{vl + i}']
for i, a in enumerate(audio_tracks)
]) +
flatlist([
[f'-metadata:s:a:{i + multi_audio_offset}', f'title=Camera {i + 1}']
for i, a in enumerate(audio_tracks)
]) +
['-y', outpath, '-hide_banner'] +
# ['-loglevel', 'warning'] +
[]
)
if clobber or not os.path.exists(outpath):
pprint.pprint(command)
try:
subprocess.run(command, check=True)
except:
os.unlink(outpath)
raise
os.makedirs("merged", exist_ok=True)
# for video in glob.glob("VideoClip/*.mp4"):
# writeMerged(video)
video_files = glob.glob("VideoClip/*.mp4")
for k, g in itertools.groupby(video_files, getVideoChapter):
group = list(g)
group = sorted(group, key=lambda str: (str.find("Choice"), str))
if k.startswith("0L-03-02"):
if k in ["0L-03-02A", "0L-03-02B", "0L-03-02C"]:
# insert arbitary lockdown Megan to make Jeremy fit
group.insert(0, os.path.normpath(f"VideoClip/{k}-03A Camera 01.mp4"))
else:
print("SKIPPING", k, group)
continue
if k.startswith("H05-01"):
# Day 5 Headlines
continue
if k.startswith("H0L-04") or k.startswith("H0L-06"):
# Day L Headlines
continue
print(k, group)
assert len(group) <= 4
writeMergedGroup(k, group)
# Lockdown Megan
writeMergedGroup("0L-03-03", glob.glob("VideoClip/0L-03-02A-03*.mp4"), ex=['-ss', '00:02:30'])
# Day 5 Headlines
for state in "ABC":
writeMergedGroup(f"H05-01 {state}", [*glob.glob("VideoClip/H05-01 Intro.mp4"), *glob.glob(f"VideoClip/H05-0*{state}.mp4")])
# Day L Headlines
for state in "ABCD":
writeMergedGroup(f"H0L-04 {state}", [*glob.glob("VideoClip/H0L-04 Intro*.mp4"), *glob.glob(f"VideoClip/H0L-04 Choice*{state}.mp4")])
for state in "ABCD":
writeMergedGroup(f"H0L-06 {state}", [*glob.glob("VideoClip/H0L-06 Intro*.mp4"), *glob.glob(f"VideoClip/H0L-06 Choice*{state}.mp4")])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment