Skip to content

Instantly share code, notes, and snippets.

@hhsprings
Last active October 12, 2021 03:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hhsprings/02f775b046e6efed6dff55a586d439ac to your computer and use it in GitHub Desktop.
Save hhsprings/02f775b046e6efed6dff55a586d439ac to your computer and use it in GitHub Desktop.
shorthand of "ffmpeg ... -c:v libx265 ..."
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import os
import sys
import re
import subprocess
import shutil
import tempfile
import logging
from glob import glob
_log = logging.getLogger()
_spcodes = {
"reset": "\x1b[39;49;00m",
# R:1 G Y B M C W:7
"target": "\x1b[33m", #"\x1b[42;30m",
"extb": "\x1b[44;37;1m",
"exta": "\x1b[44;37;1m",
"reduced": "\x1b[42;30m",
}
if sys.platform == "win32":
try:
import colorama
colorama.init()
except ImportError as e:
_spcodes = {k: "" for k, _ in _spcodes.items()}
if hasattr("", "decode"):
_encode = lambda s: s.encode(sys.getfilesystemencoding())
else:
_encode = lambda s: s
def _filter_args(*cmd):
"""
do filtering None, and do encoding items to bytes
(in Python 2).
"""
return list(map(_encode, filter(None, *cmd)))
def check_call(*popenargs, **kwargs):
"""
Basically do simply forward args to subprocess#check_call, but this
does two things:
* It does encoding these to bytes in Python 2.
* It does omitting `None` in *cmd.
"""
try:
import psutil
_popen = psutil.Popen
except ImportError:
_popen = subprocess.Popen
def _call(*popenargs, timeout=None, **kwargs):
with _popen(*popenargs, **kwargs) as p:
try:
# IDLE_PRIORITY_CLASS=2<<5
if hasattr(p, "nice"):
p.nice(2<<5)
return p.wait(timeout=timeout)
except: # Including KeyboardInterrupt, wait handled that.
p.kill()
# We don't call p.wait() again as p.__exit__ does that for us.
raise
def _check_call(*popenargs, **kwargs):
retcode = _call(*popenargs, **kwargs)
if retcode:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise subprocess.CalledProcessError(retcode, cmd)
return 0
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
_check_call(_filter_args(cmd), **kwargs)
def check_stderroutput(*popenargs, **kwargs):
"""
Unfortunately, ffmpeg and ffprobe throw out the information
we want into the standard error output, and subprocess.check_output
discards the standard error output. This function is obtained by
rewriting subprocess.check_output for standard error output.
And this does two things:
* It does encoding these to bytes in Python 2.
* It does omitting `None` in *cmd.
"""
if 'stderr' in kwargs:
raise ValueError(
'stderr argument not allowed, it will be overridden.')
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
#
process = subprocess.Popen(
_filter_args(cmd),
stderr=subprocess.PIPE,
**kwargs)
stdout_output, stderr_output = process.communicate()
retcode = process.poll()
if retcode:
raise subprocess.CalledProcessError(
retcode, list(cmd), output=stderr_output)
return stderr_output
# FIXME: .avi, .wmv, ...?
SUPPORTED_CONTAINERS = [".mkv", ".mp4", ".webm"]
def _check_orig(ifn, enctxt):
probed = check_stderroutput(["ffprobe", "-hide_banner", ifn])
if re.search(br"Video: ansi,", probed):
raise ValueError("{} is not video.".format(ifn))
return re.search(enctxt, probed)
def _ffmpeg_for_conv(ifn, ofn, orig_same, encoder, fps, supress_ffmpeg_banner_opt):
#venc = "copy" if orig_same else "libx265"
ffmcmdl = [
"ffmpeg", supress_ffmpeg_banner_opt, "-y",
"-i", ifn,
"-c:v",
]
#"-c:v", venc,
if orig_same and not fps and "-crf" not in encoder:
ffmcmdl.append("copy")
else:
ffmcmdl.extend(encoder)
if fps:
ffmcmdl.extend(["-vf", "fps={}".format(fps)])
ffmcmdl.extend([
"-c:a", "copy",
"-map_metadata", "-1",
ofn
])
check_call(ffmcmdl)
def _get_argvideos(args):
from glob import glob
files = []
if args.videoargs_is_glob:
for pat in args.video:
files.extend(list(glob(pat)))
else:
files = args.video
order = args.processing_order
if order != "as_is":
if order.startswith("small"):
files.sort(key=lambda fn: os.stat(fn).st_size)
else:
files.sort(key=lambda fn: -os.stat(fn).st_size)
return files
def _main():
_LOGFMT = "%(asctime)s:%(levelname)5s:%(module)s(%(lineno)d):%(thread)x| %(message)s"
logging.basicConfig(
stream=sys.stderr, level=logging.INFO, format=_LOGFMT)
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("video", nargs="+")
ap.add_argument(
"--videoencoder",
choices=["libx265", "libvpx-vp9"], default="libx265")
ap.add_argument("--fps", type=float)
ap.add_argument(
"--crf_x265", type=float,
help="if this is not specified, ffmpeg seems to use -crf 28 as default.")
ap.add_argument("--crf_vp9", default="40")
ap.add_argument(
"--out_container",
choices=SUPPORTED_CONTAINERS + ["KEEP"], default="KEEP")
ap.add_argument("--browse_wd", help="ex. 'explorer'")
ap.add_argument("--remove_original", action="store_true")
ap.add_argument("--videoargs_is_glob", action="store_true")
ap.add_argument("--donttouch_if_same_codec", action="store_true")
ap.add_argument("--just_check_glob", action="store_true")
ap.add_argument("--supress_ffmpeg_banner", action="store_true")
ap.add_argument(
"--processing_order",
choices=[
"as_is",
"smallersize_first",
"largersize_first",
], default="as_is")
args = ap.parse_args()
curdir = os.path.abspath(os.curdir)
if not args.just_check_glob:
tmpdir = tempfile.mkdtemp()
else:
tmpdir = ""
if args.browse_wd:
try:
check_call([args.browse_wd, curdir])
except Exception:
pass
try:
check_call([args.browse_wd, tmpdir])
except Exception:
pass
if args.videoencoder == "libx265":
enc, enctxt = "h.265", br" Video: hevc "
encoder = [args.videoencoder]
if args.crf_x265:
encoder.extend(["-crf", "{}".format(args.crf_x265)])
SUPPORTED_CONTAINERS.remove(".webm")
else:
enc, enctxt = "vp9", br" Video: vp9 "
encoder = [args.videoencoder, "-crf", args.crf_vp9]
encoder.extend(["-max_muxing_queue_size", "2048"])
#
def _processed_infoout(ifn, ofn):
return "{}{}{} ('{}{}{}' {:3,d} bytes -> '{}{}{}' {:3,d} bytes ({}{:.2f}{} %))".format(
_spcodes["target"],
os.path.basename(ofn),
_spcodes["reset"],
_spcodes["extb"],
os.path.splitext(ifn)[1],
_spcodes["reset"],
os.stat(ifn).st_size,
_spcodes["exta"],
os.path.splitext(ofn)[1],
_spcodes["reset"],
os.stat(ofn).st_size,
_spcodes["reduced"],
os.stat(ofn).st_size / os.stat(ifn).st_size * 100,
_spcodes["reset"])
#
supress_ffmpeg_banner_opt = "-hide_banner" if args.supress_ffmpeg_banner else ""
try:
_done = []
for media in _get_argvideos(args):
try:
orig_same = _check_orig(media, enctxt)
except Exception as e:
_log.error(e)
continue
if orig_same and args.donttouch_if_same_codec:
_log.info("ignoring: %s", media)
continue
#
if args.just_check_glob:
ifn, ofn = media, media
else:
ifn = os.path.join(tmpdir, media)
#
ext = args.out_container
if ext == "KEEP":
_, ext = os.path.splitext(ifn.lower())
if ext not in SUPPORTED_CONTAINERS:
# Should we do either whitelist or blacklist test?
_log.info("ignoring: '%s' because '%s' does not support %s", media, ext, enc)
continue
#
if not os.path.exists(os.path.dirname(ifn)):
os.makedirs(os.path.dirname(ifn))
shutil.copyfile(media, ifn)
os.remove(media)
#
mkv = os.path.splitext(media)[0] + ext
ofn = os.path.join(curdir, mkv)
_ffmpeg_for_conv(
ifn, ofn,
orig_same, encoder, args.fps,
supress_ffmpeg_banner_opt)
_log.info("%s", _processed_infoout(ifn, ofn))
_done.append((ifn, ofn))
except KeyboardInterrupt:
args.remove_original = False
finally:
if _done:
_log.info("processed:\n %s", "\n ".join([
_processed_infoout(ifn, ofn) for ifn, ofn in _done]))
if args.remove_original:
for ifn, _ in _done:
try:
os.remove(ifn)
except Exception as erm:
_log.warning("cannot remove %r: %r", ifn, erm)
if not list(glob(os.path.join(tmpdir, "*"))):
os.rmdir(tmpdir)
else:
_log.info("originals: {}".format(tmpdir))
if __name__ == '__main__':
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment