Skip to content

Instantly share code, notes, and snippets.

@rokibhasansagar
Last active December 5, 2023 11:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rokibhasansagar/5ec201911892ea691e504fe4a283d586 to your computer and use it in GitHub Desktop.
Save rokibhasansagar/5ec201911892ea691e504fe4a283d586 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
# Filename: zonefileInfuserZ.py
import argparse
import os
import json
import shutil
import time
import vapoursynth as vs
from vapoursynth import core
core.max_cache_size = 8*1024
core.num_threads = 4
VALID_METHODS = ['v2_zig', 'v2']
# Parse arguments
parser = argparse.ArgumentParser(prog='VapourSynth zonefile Infuser using SSIMULACRA2 score', description='Calculate SSIMULACRA2 score and Rebuild zonefile in scene json file.')
parser.add_argument('source', help='Source video path. Can be relative to this script or a full path.')
parser.add_argument('encoded', help='Encoded video path. Can be relative to this script or a full path.')
parser.add_argument('-m', '--method', dest='method', choices=VALID_METHODS, default='v2_zig', help='SSIMULACRA method. Default: v2_zig.')
parser.add_argument('-c', '--crf', dest='crf', type=int, default=30, help='CRF Value for 1st Encoded Video to use. Default: 30.')
parser.add_argument('-w', '--width', dest='width', type=int, help='Video width to up/downscale to.')
parser.add_argument('-p', '--height', dest='height', type=int, help='Video height to up/downscale to.')
parser.add_argument('-v', '--variability', dest='variability', type=int, default=4, help='variability from CRF Value for new zonefile. Default: 4.')
parser.add_argument('-s', '--skip', dest='skip', type=int, default=1, help='Frames calculated every (n+1)th frame. Default: 1 (Every other frame is calculated). For example, setting this to 5 will calculate every 6th frame.')
args = parser.parse_args()
source: str = args.source
encoded: str = args.encoded
method: str = args.method
crf: int = args.crf
width: int = args.width
height: int = args.height
variability: int = args.variability
skip: int = args.skip
def getscenes():
# Loads sc.json file into json dictionary
with open("sc.json") as file:
c = file.read().replace("null", "0")
return json.loads(c)
def ssimu2compute(sourcefile, distortedfile, skip = 1):
# Computes SSIMULACRA2 score
src = core.lsmas.LWLibavSource(sourcefile, threads=0, cache=0)
dis = core.lsmas.LWLibavSource(distortedfile, threads=0, cache=0)
if (skip > 0):
src = src.std.SelectEvery(cycle=skip + 1, offsets=0)
dis = dis.std.SelectEvery(cycle=skip + 1, offsets=0)
# Adjust format and color space for selected method
if (method == 'v2'):
# Only works with RGB24 format.
src = src.resize.Bicubic(width=width, height=height, format=vs.RGB24, matrix_in_s='709')
dis = dis.resize.Bicubic(width=width, height=height, format=vs.RGB24, matrix_in_s='709')
elif (method == 'v2_zig'):
# Only works with RGBS format.
src = src.resize.Bicubic(width=width, height=height, format=vs.RGBS, matrix_in_s='709')
dis = dis.resize.Bicubic(width=width, height=height, format=vs.RGBS, matrix_in_s='709')
# Must be converted from gamma to linear with fmtc because resize/zimg uses another formula.
src = src.fmtc.transfer(transs="srgb", transd="linear", bits=32)
dis = dis.fmtc.transfer(transs="srgb", transd="linear", bits=32)
else:
# Invalid method
print(f'Invalid method: {method}')
exit(1)
result = src.julek.SSIMULACRA(dis, feature=0) if method == 'v2' else src.ssimulacra2.SSIMULACRA2(dis)
res = [[ind*(skip+1), fr.props["_SSIMULACRA2"]] for (ind, fr) in enumerate(result.frames())]
return [k for k in res if k[1] > 0]
def statistics(intlist):
# Returns (avg, deviation, median, 5th percentile, 95th percentile)
avg = sum(intlist)/len(intlist)
deviation = ((sum([k*k for k in intlist])/len(intlist)) - avg*avg)**0.5
sortedlist = sorted(intlist)
return (avg, deviation, sortedlist[len(intlist)//2], sortedlist[len(intlist)//20], sortedlist[19*len(intlist)//20])
def cutssimu2byscene(scores, scenes):
# Separates SSIMULACRA2 score into per-scene basis
scoreind = 0
res = []
print(f'len(scores) = {len(scores)}\n')
for scene in scenes["scenes"]:
# print(f'Scene Data: {scene}')
res.append([])
while (scoreind < len(scores) and scores[scoreind][0] < scene["end_frame"]):
if (scores[scoreind][0] >= scene["start_frame"]):
res[-1].append(scores[scoreind][1])
# print(f'Frame, Score = [{scores[scoreind][0]}, {scores[scoreind][1]}]')
else:
print("There is an error somewhere in scenes or in scores")
scoreind += 1
return res
def generatezonefile(scores, oldcrf, variability = 8):
# Re-creates scene json file to infuse optimized zonefile
if not os.path.isfile("sc.json"):
print("Error for generating zone file, no scene file")
return
scenes = getscenes()
# print("Original scene json fetched")
cutssimu2 = cutssimu2byscene(scores, scenes)
# To see if scoring is working or not...
print(f"\nMean SSIMU2 cut per-scene: {cutssimu2}")
# Save the per-scene frame score in a file if every file is measured
if (skip == 0):
with open("scorefile.txt", "a") as scorefile:
scorefile.write(str(f'{cutssimu2}')+"\n")
oldstats = statistics([k[1] for k in scores]) # (mean, standard deviation, median, 5th percentile, 95th percentile)
# print(f"Stats generated")
for sceneind in range(len(cutssimu2)):
if cutssimu2[sceneind] == []:
cutssimu2[sceneind].append(oldstats[0])
print(f"Warning: scene {sceneind}/{len(cutssimu2)} not having any ssimu2 value: ", scenes["scenes"][sceneind]["start_frame"], scenes["scenes"][sceneind]["end_frame"])
means = [oldstats[0] if len(scene) == 0 else sum(scene)/len(scene) for scene in cutssimu2] # Empty parts of cutssimu2 results error here
devs = [(mean - oldstats[0])/oldstats[1] for mean in means]
sdevs = sorted(devs)
for sceneind in range(len(scenes["scenes"])):
newcrf = str(min(63, max(0,oldcrf+round(variability/4+max(-variability,(devs[sceneind]/max(abs(sdevs[99*len(sdevs)//100]), abs(sdevs[1*len(sdevs)//100])))*variability)))))
with open("zonefile.txt", "a") as zonefile:
z = str(f'{scenes["scenes"][sceneind]["start_frame"]} {scenes["scenes"][sceneind]["end_frame"]} aom --cq-level={newcrf}')
# print(z)
zonefile.write(z+"\n")
scenes["scenes"][sceneind]["zone_overrides"] = {"encoder":"aom","passes":2,"min_scene_len":42,"video_params":["--cq-level="+newcrf]}
res = json.dumps(scenes)
with open("sc.json", "w") as file:
file.write(res)
# Still unused
def resetzonefile():
# Resets scene json file to original state
scenes = getscenes()
for sceneind in range(len(scenes["scenes"])):
scenes["scenes"][sceneind]["zone_overrides"] = "null"
res = json.dumps(scenes)
res = res.replace('"null"', "null")
with open("sc.json", "w") as file:
file.write(res)
def operation(source = source, encoded = encoded, crf = crf, variability = "auto", skip = skip):
if not os.path.isfile(source):
print("Couldnt find ", source)
return
scores1 = ssimu2compute(source, encoded, skip=skip)
print("SSIMU2 computation done")
# print(f'scores1: {scores1}')
stats1 = statistics([k[1] for k in scores1])
with open("scoreresult.txt", "a") as scoreresult:
scoreresult.write(str(f'{stats1}'))
print(f"\nFirst scores: {stats1}\n")
if variability == "auto":
variability = 15*crf/30*stats1[1]/8
print(f'variability is now {variability}\n')
generatezonefile(scores1, crf, variability = variability)
print("\nRun Second Pass Now")
# Run it in place
operation(source = source, encoded = encoded, crf = crf, variability = variability, skip = skip)
# Finish
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment