Last active
March 2, 2024 21:02
-
-
Save RyougiKukoc/4cfdfc8796ae8c4761ec6e4f802208fc to your computer and use it in GitHub Desktop.
Split, Encode then Merge for closed GOP hevc or avc file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# In my (and Magic-Raws') configue, mkvmerge is placed in system path, | |
# VSPipe.exe in sys.prefix, and x265 (plus x264) in sys.prefix//x26x, so that is default in this function. | |
def SEM( | |
fp_vc_input: str, | |
segment_list: list, | |
fp_qpfile: str, | |
x26x_param: str, | |
fp_vpy: str, | |
fp_vc_output: str, | |
force_expand: bool = True | |
): | |
""" | |
Split, Encode then Merge for closed GOP hevc or avc file. | |
:param fp_vc_input: file path to the input hevc or avc file | |
:param segment_list: [[l1, r1], [l2, r2], ...] describes all the segments you want to repair | |
:param fp_qpfile: origin qpfile when encode the input hevc or avc file | |
:param x26x_param: specify x264 or x265 encode command except --output, --input (input), and --qpfile | |
:param fp_vpy: file path to the vapoursynth script you redo | |
:param fp_vc_output: file path to the output hevc or avc file | |
:param force_expand: whether to expand the endpoint to Iframe for segments in segment_list | |
""" | |
import os | |
import sys | |
from vapoursynth import core | |
valid_exts = ['.hevc', '.avc', '.265', '.264'] | |
ext = os.path.splitext(fp_vc_input)[1] | |
if not ext in valid_exts: | |
raise ValueError(f'Input file invalid.') | |
if force_expand: | |
iframe_segment_list = expand_segment_to_iframe(fp_vc_input, segment_list) | |
iframe_segment_list = sort_segment(iframe_segment_list) | |
with open(fp_qpfile, "r") as f: | |
qpstr = f.readlines() | |
qpstr = [i for i in qpstr if i != "\n"] | |
qpstr = [i if i.endswith("\n") else i + "\n" for i in qpstr] | |
qpstr = [i[:-3] for i in qpstr] | |
qp = [int(i) for i in qpstr] | |
os.environ['Path'] = os.environ['Path'] + ";" + sys.prefix + "\\x26x;" + sys.prefix | |
# Default where VSPipe.exe, mkvmerge.exe, and x26x placed | |
# Todo: specify VSPipe and mkvmerge filepath. | |
file = '_tomerge.mkv' | |
os.system(f'mkvmerge -o "{file}" "{fp_vc_input}"') | |
# For segment [Iframe1, Iframe2], mkvmerge will cut [-, Iframe1) and [Iframe2, -]. | |
# So you need to re-encode [Iframe1, Iframe2) and pay attention to qp belong to this segment. | |
# Assume the origin qp equals to Qx, in this segment it will be Qx-Iframe1 | |
# If the last segment [Iframe1, Iframe2] reaches the end of the video, Iframe2 = clip.num_frames | |
qp_idx = 0 | |
last_Iframe = 0 | |
for seg in iframe_segment_list: | |
Iframe1, Iframe2 = seg[0], seg[1] | |
tmp_qp = [] | |
while qp_idx < len(qp): | |
Qx = qp[qp_idx] | |
if Qx < Iframe1: | |
qp_idx += 1 | |
elif Iframe1 <= Qx < Iframe2: | |
tmp_qp += [Qx - Iframe1] | |
qp_idx += 1 | |
else: | |
break | |
tmp_qp_str = "\n".join([f"{i} K" for i in tmp_qp]) | |
with open("tmp_qp.qpfile", "w") as f: | |
f.write(tmp_qp_str) | |
if Iframe1 == 0: | |
os.system(f'VSPipe "{fp_vpy}" -c y4m -s {Iframe1} -e {Iframe2 - 1} - | ' + \ | |
f'{x26x_param} --qpfile "tmp_qp.qpfile" -o "_newseg{ext}" -') | |
os.system(f'mkvmerge -o "_lastseg.mkv" "_newseg{ext}"') | |
else: | |
# Judge whether you have [0, last_Iframe) in _lastseg.mkv | |
if os.path.exists("_lastseg.mkv"): | |
os.system(f'mkvmerge -o "_newseg.mkv" --split parts-frames:{last_Iframe+1}-{Iframe1+1} "{file}"') | |
os.system(f'mkvmerge -o "_last.mkv" "_lastseg.mkv" + "_newseg.mkv"') | |
else: | |
os.system(f'mkvmerge -o "_last.mkv" --split parts-frames:{last_Iframe+1}-{Iframe1+1} "{file}"') | |
# Now you have [0, Iframe1) in _last.mkv | |
os.system(f'VSPipe "{fp_vpy}" -c y4m -s {Iframe1} -e {Iframe2 - 1} - | ' + \ | |
f'{x26x_param} --qpfile "tmp_qp.qpfile" -o "_newseg{ext}" -') | |
os.system(f'mkvmerge -o "_newseg.mkv" "_newseg{ext}"') | |
os.system(f'mkvmerge -o "_lastseg.mkv" "_last.mkv" + "_newseg.mkv"') | |
last_Iframe = Iframe2 | |
if last_Iframe != core.lsmas.LWLibavSource(file).num_frames: | |
os.system(f'mkvmerge -o "_newseg.mkv" --split parts-frames:{last_Iframe+1}- "{file}"') | |
os.system(f'mkvmerge -o "_last.mkv" "_lastseg.mkv" + "_newseg.mkv"') | |
os.system(f'mkvextract "_last.mkv" tracks 0:"{fp_vc_output}"') | |
else: | |
os.system(f'mkvextract "_lastseg.mkv" tracks 0:"{fp_vc_output}"') | |
os.remove(file) | |
os.remove("tmp_qp.qpfile") | |
os.remove(f"_newseg{ext}") | |
os.remove("_lastseg.mkv") | |
os.remove("_last.mkv") | |
os.remove("_newseg.mkv") | |
os.remove("_tomerge.mkv.lwi") | |
def expand_segment_to_iframe(vc_filepath: str, segment_list: list): | |
import os | |
import xml.etree.ElementTree as xet | |
os.system('ffprobe -hide_banner -v error -threads auto -show_frames -show_entries frame=key_frame ' + | |
f'-of xml -select_streams v:0 -i "{vc_filepath}" > tmp_frames.xml') | |
frames = xet.parse("tmp_frames.xml").getroot()[0] | |
num_frames = len(frames) | |
iseg_list = [] | |
for seg in segment_list: | |
l, r = seg[0], seg[1] | |
if l < 0 or l >= num_frames or r < 0 or r >= num_frames: | |
raise ValueError(f'Invalid segment [{l}, {r}]') | |
while l > 0 and frames[l].attrib['key_frame'] != '1': | |
l -= 1 | |
while r < num_frames and frames[r].attrib['key_frame'] != '1': | |
# if the last frame (num_frames - 1) is not an I Frame, r should be num_frames | |
r += 1 | |
iseg_list += [[l, r]] | |
os.remove('tmp_frames.xml') | |
return iseg_list | |
def sort_segment(segment_list: list): | |
segment_list = sorted(segment_list, key=lambda x:x[0]) # segments are sorted by left endpoint | |
merge_list = [] | |
i = 0 | |
while i < len(segment_list): | |
seg = segment_list[i] | |
l, r = seg[0], seg[1] | |
j = i + 1 | |
while j < len(segment_list) and segment_list[j][0] <= r: | |
r = max(segment_list[j][1], r) | |
j += 1 | |
merge_list += [[l, r]] | |
i = j | |
return merge_list |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment