Skip to content

Instantly share code, notes, and snippets.

@MadMcCrow
Created December 11, 2022 13:35
Show Gist options
  • Save MadMcCrow/962fc0310a5633bb80c36e7ab06abbce to your computer and use it in GitHub Desktop.
Save MadMcCrow/962fc0310a5633bb80c36e7ab06abbce to your computer and use it in GitHub Desktop.
A nice tool to convert and optimize your videos
#!/usr/bin/python
# A simple script to compress videos and gain space on your drives
# TODO :
# Add a way to set parameters
# work with a Video-class to simplify parsing
import os
import shlex
import datetime
import time
import sys
import math
import subprocess
import re
# this script constant parameters:
# @note : these could be cli parameters
CPU_USAGE = 0.95
COMP_FACTOR = 0.8
MAX_SIZE_HEIGHT = 1440
FILE_TAG = ""
PRESET = "ultrafast"
CRF = 21
EXTENSIONS = ["mp4", "mkv", "avi"]
CODEC = "libx265"
BAR_LENGTH = 20
# various ffmpeg regex
_durex = r".*Duration: (\d{2}):(\d{2}):(\d{2}).(\d{2}).*bitrate: (\d*) kb/s"
_framex = r".*frame=\s*(\d{1,}).*"
_forex = r".*Stream.*Video: (\S+) .*, (\d{3,})x(\d{3,}).*, (\d*) kb/s, (\d+\.?\d*) fps"
#
# @func help
# explains how to use
#
def help() :
print('''
this script compresses all your videos.
it can take two inputs :
- first the path of the folder containing all your videos
- second the path of the folder you want to output to
example :
python3 ./convert.py ./inputs ./outputs
'''
)
#
# @func getThreads
# get a reasonable amount of CPU power
#
def getThreads() -> int:
return math.floor(os.cpu_count() * CPU_USAGE)
#
# @func getSize
# get the size of a file
#
def getSize(file_path :str) -> int:
try :
return int(os.path.getsize(file_path))
except:
return 0
#
# @func parseDuration
# Convert the 4 values given by ffmpeg to a timedelta
#
def parseDuration(hr, mn, sc, ct) :
return datetime.timedelta(
microseconds = int(ct) * 10000,
seconds = int(sc),
minutes = int(mn),
hours = int(hr))
#
# @func findFiles
# get all the video files in a path
# @param filepath the path you want files from
# @param ext the extension of files
#
def findFiles(filepath : str, ext :str) -> list:
glob = []
for cur, _dirs, files in os.walk(filepath):
glob = glob + ['/'.join([cur,itr]) for itr in files if str(itr).endswith(ext)]
return glob
#
# Displays a nice progressbar for
#
def progressBar(ffmpeg_output, total_frame_count) :
try:
fr = re.search(_framex, ffmpeg_output)
if fr :
framecount = int(fr.group(1))
percent = framecount / (total_frame_count + 1)
bar = "".join(["#" for i in range(0,int(BAR_LENGTH * percent))] + ["-" for i in range(int(BAR_LENGTH * percent) + 1, BAR_LENGTH)])
print(f"{round(percent *100)}% |{bar}|", end='\r')
except:
pass
#
# @func ffprobe
# give you key information about your videos
#
def ffprobe(file) :
# the regexes:
# retval
data = {}
#cmd
lines = str(subprocess.run(shlex.split("ffprobe {}".format('"{}"'.format(file))), stdout = subprocess.PIPE, stderr = subprocess.STDOUT).stdout)
# parse
dr = re.search(_durex, lines)
fr = re.search(_forex, lines)
data["bitrate"] = dr.group(5) if dr else 0
data["duration"] = parseDuration(dr.group(1), dr.group(2), dr.group(3), dr.group(4)) if dr else datetime.timedelta()
data['codec'] = fr.group(1) if fr else "h264"
data['width'] = int(fr.group(2)) if fr else 0
data['height'] = int(fr.group(3)) if fr else 0
data['vbr'] = int(fr.group(4)) if fr else 0
data['fps'] = float(fr.group(5)) if fr else 0
return data
#
# @func ffmpeg
# perform the actual conversion
#
def ffmpeg(in_file:str, out_file:str) :
indata = ffprobe(in_file)
scale = "-1:{}".format(str(min(indata['height'], MAX_SIZE_HEIGHT)))
cmd = "ffmpeg -progress /dev/stdout -y -threads {threads} -i {input_file} -vf 'scale={scale}' -c:v {codec} -crf {crf} -crf_max 32 -preset {preset} -c:a copy {output_file}".format(
input_file = '"{}"'.format(in_file),
output_file = '"{}"'.format(out_file),
scale = scale,
preset = PRESET,
crf = CRF,
#codec = str(indata["codec"]),
codec = CODEC,
threads = str(getThreads()))
frame_count = int(indata["fps"] * indata["duration"].total_seconds())
try:
process = subprocess.Popen(shlex.split(cmd), stdout = subprocess.PIPE, stderr = subprocess.PIPE )
while process.poll() is None:
line = ""
try :
line = process.stdout.readline().decode('utf-8')
progressBar(line, frame_count)
except:
break
time.sleep(0.01)
process.kill()
stdout, stderr = process.communicate()
return str(stderr)
except Exception as error:
print(f"Error while converting : {error}")
try:
process.kill()
process.communicate()
except :
pass
raise
#
# @func transcode
# will convert and check if successfully reduced size
#
def compress(video_file : str, output_folder : str) -> bool :
base_name, extension = os.path.splitext(os.path.basename(video_file))
new_file_name = "".join([base_name,FILE_TAG, extension]);
new_file = os.path.join(output_folder, new_file_name)
try :
ffmpeg(video_file, new_file)
old_size = getSize(video_file)
if getSize(new_file) in range( int( old_size * 0.1), int( old_size * COMP_FACTOR)) :
print(f"success : {video_file} was compressed to {new_file} !")
os.remove(video_file)
else :
print(f"Failed : {video_file} could not get compressed enough !")
try :
os.remove(new_file)
os.rename(video_file, new_file)
except:
pass
except Exception as error:
print(f"Error : {video_file} {error}!")
raise
# Run script if called directly
if __name__ == "__main__":
args = sys.argv[1:5]
try :
if "help" in args[0] :
help()
exit()
else :
video_folder = args[0]
output_folder = args[1]
all_vids = []
for ext in EXTENSIONS :
all_vids.extend(findFiles(video_folder, ext))
all_vids.sort(key = getSize) # sorts vids by size to always start with the easy ones
# compress all files
os.makedirs(output_folder, exist_ok=True)
for vid in all_vids :
if not FILE_TAG in vid or FILE_TAG == "" :
print(f"Compressing {vid} to {CODEC} with {getThreads()} threads")
compress(vid, output_folder)
else :
print(f"{vid} already optimised")
except Exception as error:
help()
print(f"error {error}")
raise error
else :
"that's all folks !"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment