Last active
October 5, 2020 03:30
-
-
Save hhsprings/eda85552c8b11128ba244d9cf333bb77 to your computer and use it in GitHub Desktop.
splitting video at blank using with ffmpeg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals | |
from __future__ import division | |
from __future__ import print_function | |
import re | |
import io | |
import os | |
import sys | |
import subprocess | |
import hashlib | |
import json | |
import pipes | |
from mako.template import Template # pip install mako | |
if hasattr("", "decode"): # python 2 | |
def _encode(s): | |
return s.encode(sys.getfilesystemencoding()) | |
def _decode(s): | |
return s.decode(sys.getfilesystemencoding()) | |
else: | |
def _encode(s): | |
return s | |
def _decode(s): | |
return s | |
def _filter_args(*cmd): | |
""" | |
do filtering None, and do encoding items to bytes | |
(in Python 2). | |
""" | |
return list(map(_encode, filter(None, *cmd))) | |
def check_call(*popenargs, **kwargs): | |
""" | |
Basically do simply forward args to subprocess#check_call, but this | |
does two things: | |
* It does encoding these to bytes in Python 2. | |
* It does omitting `None` in *cmd. | |
""" | |
cmd = kwargs.get("args") | |
if cmd is None: | |
cmd = popenargs[0] | |
subprocess.check_call( | |
_filter_args(cmd), **kwargs) | |
def check_stderroutput(*popenargs, **kwargs): | |
""" | |
Unfortunately, ffmpeg and ffprobe throw out the information | |
we want into the standard error output, and subprocess.check_output | |
discards the standard error output. This function is obtained by | |
rewriting subprocess.check_output for standard error output. | |
And this does two things: | |
* It does encoding these to bytes in Python 2. | |
* It does omitting `None` in *cmd. | |
""" | |
if 'stderr' in kwargs: | |
raise ValueError( | |
'stderr argument not allowed, it will be overridden.') | |
cmd = kwargs.get("args") | |
if cmd is None: | |
cmd = popenargs[0] | |
# | |
process = subprocess.Popen( | |
_filter_args(cmd), | |
stderr=subprocess.PIPE, | |
**kwargs) | |
stdout_output, stderr_output = process.communicate() | |
retcode = process.poll() | |
if retcode: | |
raise subprocess.CalledProcessError( | |
retcode, list(cmd), output=stderr_output) | |
return stderr_output | |
class _DetInvokator(object): | |
_filt_args_tmpl = { | |
"a": ("-af", | |
"{{prep_filt}}{}detect={{det_paras}},ametadata=mode=print:file={{det_resfn}}", | |
"-vn"), | |
"v": ("-vf", | |
"{{prep_filt}}{}detect={{det_paras}},metadata=mode=print:file={{det_resfn}}", | |
"-an"), | |
} | |
_filt_args_typmap = { | |
"S": ("a", "silence"), | |
"B": ("v", "black"), | |
"F": ("v", "freeze"), | |
} | |
def __init__(self, infn, typ, det_paras, prep_filt, dont_use_cache): | |
# | |
self.infn = infn | |
self._typ = typ | |
# | |
_tmpdir = ".det_tmp" | |
if not os.path.exists(_tmpdir): | |
os.mkdir(_tmpdir) | |
# | |
_kvexpand = lambda d: ":".join(["%s=%s" % (k, v) for k, v in sorted(d.items())]) | |
_digest = lambda s: hashlib.md5( | |
(infn + "," + _kvexpand(s)).encode("utf-8")).hexdigest() | |
# | |
if prep_filt.endswith(","): | |
prep_filt = prep_filt[:-1] | |
_subst = { | |
"det_paras": _kvexpand(det_paras), | |
"prep_filt": prep_filt + "," if prep_filt else "" | |
} | |
self._tmpfn = _tmpdir + "/" + self._typ + _digest(_subst) | |
_subst["det_resfn"] = self._tmpfn | |
tmk, tmtyp = self._filt_args_typmap[self._typ] | |
self._filt_args = list(self._filt_args_tmpl[tmk]) | |
self._filt_args[1] = self._filt_args[1].format(tmtyp) | |
self._filt_args[1] = self._filt_args[1].format(**_subst) | |
# | |
self._resfn = self._tmpfn + ".fin" | |
self._exists = False | |
if not dont_use_cache: | |
self._exists = os.path.exists(self._resfn) | |
elif os.path.exists(self._resfn): | |
os.remove(self._resfn) | |
# .par: not for machinary but for human | |
with io.open(self._tmpfn + ".par", "w") as fo: | |
fo.write(_decode(json.dumps({infn: _subst}, sort_keys=True))) | |
# | |
def execute(self, ffmpeg, unsplittables): | |
if not self._exists: | |
_cmdl = [ | |
ffmpeg, "-hide_banner", "-y", "-i", self.infn, | |
self._filt_args[0], self._filt_args[1], | |
self._filt_args[2], | |
"-f", "null", "-" | |
] | |
try: | |
check_call(_cmdl) | |
os.rename(self._tmpfn, self._resfn) | |
except subprocess.CalledProcessError as e: | |
print(repr(e), file=sys.stderr) | |
return [] | |
return _get_ranges_from_detectmeta( | |
self._resfn, self._filt_args_typmap[self._typ][1], unsplittables) | |
def _ts_to_tss(ts, frac=3): | |
d, _, f = (("%%.%df" % frac) % ts).partition(".") | |
d = abs(int(d)) | |
ss_h = int(d / 3600) | |
d -= ss_h * 3600 | |
ss_m = int(d / 60) | |
d -= ss_m * 60 | |
ss_s = int(d) | |
return "%s%02d:%02d:%02d.%s" % ( | |
"" if ts >= 0 else "-", | |
ss_h, ss_m, ss_s, f) | |
def _tss_to_ts(tss): | |
try: | |
return float(tss) | |
except ValueError: | |
import math | |
dp, _, fp = tss.partition(".") | |
dp = map(int, dp.split(":")) | |
dp = int(sum([p * math.pow(60, 2 - i) for i, p in enumerate(dp)])) | |
if fp: | |
dp = "%s.%s" % (dp, fp) | |
return float(dp) | |
def _in_range(t, s, e): | |
return (s <= t and t <= e) | |
def _contains(r1, r2): | |
r""" | |
>>> _contains([1, 3], [0, 4]) | |
True | |
>>> _contains([1, 3], [1, 3]) | |
True | |
>>> _contains([1, 3], [1, 2]) | |
False | |
>>> _contains([1, 3], [2, 3]) | |
False | |
>>> _contains([1, 4], [2, 3]) | |
False | |
""" | |
return (_in_range(r1[0], r2[0], r2[1]) and \ | |
_in_range(r1[1], r2[0], r2[1])) | |
def _ranges_overlapped(r1, r2): | |
r""" | |
>>> # r1 o-----o | |
>>> # r2 o-----o | |
>>> _ranges_overlapped([1, 2], [1, 2]) | |
True | |
>>> # r1 o-----o | |
>>> # r2 o-----o | |
>>> _ranges_overlapped([1, 2], [1.5, 2.5]) | |
True | |
>>> # r1 o-----o | |
>>> # r2 o-----o | |
>>> _ranges_overlapped([1.5, 2.5], [1, 2]) | |
True | |
>>> # r1 o---o | |
>>> # r2 o-----o | |
>>> _ranges_overlapped([1.5, 2], [1, 2.5]) | |
True | |
>>> # r1 o-----o | |
>>> # r2 o---o | |
>>> _ranges_overlapped([1, 2.5], [1.5, 2]) | |
True | |
>>> # r1 o-----o | |
>>> # r2 o---o | |
>>> _ranges_overlapped([1, 2], [2, 3]) | |
True | |
>>> # r1 o-----o | |
>>> # r2 o---o | |
>>> _ranges_overlapped([1, 2], [2.5, 3]) | |
False | |
>>> # r1 o---o | |
>>> # r2 o-----o | |
>>> _ranges_overlapped([2.5, 3], [1, 2]) | |
False | |
""" | |
return \ | |
_in_range(r1[0], r2[0], r2[1]) or \ | |
_in_range(r1[1], r2[0], r2[1]) or \ | |
_in_range(r2[0], r1[0], r1[1]) or \ | |
_in_range(r2[1], r1[0], r1[1]) | |
def _extract_in_range(tsrlist, rnglist, neg=False): | |
r""" | |
>>> t = [[1, 2], [3, 6], [8, 9], [9, 11], [11, 15]] | |
>>> _extract_in_range(t, [[5, 11]]) | |
[[5, 6], [8, 9], [9, 11], [11, 11]] | |
>>> _extract_in_range(t, [[2, 4], [5, 11]]) | |
[[2, 2], [3, 4], [5, 6], [8, 9], [9, 11], [11, 11]] | |
>>> _extract_in_range(t, [[5, 11], [5, 11]]) | |
[[5, 6], [8, 9], [9, 11], [11, 11]] | |
>>> # | |
>>> _extract_in_range(t, [[5, 11]], neg=True) | |
[[1, 2], [3, 5], [11, 15]] | |
>>> _extract_in_range(t, [[5, 10]], neg=True) | |
[[1, 2], [3, 5], [10, 11], [11, 15]] | |
""" | |
result = [] | |
def _append(x): | |
if x not in result: | |
result.append(x) | |
for t in tsrlist: | |
if not neg: | |
for r in rnglist: | |
if _contains(t, r): | |
_append(t) | |
elif _ranges_overlapped(t, r): | |
_append( | |
[max(t[0], r[0]), min(t[1], r[1])]) | |
else: | |
for r in rnglist: | |
if _contains(t, r): | |
break | |
elif _ranges_overlapped(t, r): | |
t_n = list(t) | |
if _in_range(r[0], t[0], t[1]): | |
t_n[1] = r[0] | |
elif _in_range(r[1], t[0], t[1]): | |
t_n[0] = r[1] | |
_append(t_n) | |
break | |
else: | |
_append(t) | |
return result | |
def _norm_tsrange_list(rl, dur, len_if_single=2.0): | |
r""" | |
>>> _print = lambda lst: print( | |
... ", ".join(["[%.3f, %.3f]" % (s, e) for s, e in lst])) | |
>>> _print(_norm_tsrange_list([], 5)) | |
<BLANKLINE> | |
>>> _print(_norm_tsrange_list([[]], 5)) | |
<BLANKLINE> | |
>>> _print(_norm_tsrange_list([[1, 2], [3, 4]], 5)) | |
[1.000, 2.000], [3.000, 4.000] | |
>>> _print(_norm_tsrange_list([[1], [3, 4]], 5)) | |
[0.000, 2.000], [3.000, 4.000] | |
>>> _print(_norm_tsrange_list([1, [3, 4]], 5)) | |
[0.000, 2.000], [3.000, 4.000] | |
>>> _print(_norm_tsrange_list(["00:00:01.1", [3, 4]], 5)) | |
[0.100, 2.100], [3.000, 4.000] | |
>>> _print(_norm_tsrange_list(["00:00:01.1", ["00:00:03", 4]], 5)) | |
[0.100, 2.100], [3.000, 4.000] | |
>>> _print(_norm_tsrange_list(["00:00:01.1", ["00:00:03", 4]], 3.5)) | |
[0.100, 2.100], [3.000, 3.500] | |
>>> _print(_norm_tsrange_list(["00:00:01.1", ["00:00:03", 4]], 3.5, 1.0)) | |
[0.600, 1.600], [3.000, 3.500] | |
""" | |
result = [] | |
for item in rl: | |
s, m, e = None, None, None | |
if isinstance(item, (list, tuple,)): | |
if not item: | |
continue | |
if len(item) >= 2: | |
s, e = item[:2] | |
else: | |
m = _tss_to_ts(item[0]) | |
else: | |
m = _tss_to_ts(item) | |
if m: | |
s, e = m - len_if_single / 2, m + len_if_single / 2 | |
result.append( | |
(max(0.0, _tss_to_ts(s)), min(_tss_to_ts(e), dur))) | |
return result | |
def _get_ranges_from_detectmeta(detmf, kind, unsplittables): | |
_re_se = re.compile(r"^lavfi.%s_[se]" % kind) | |
_re_s = re.compile(r"^lavfi.%s_start=(.*)$" % kind) | |
_re_e = re.compile(r"^lavfi.%s_end=(.*)$" % kind) | |
_tmp = [] | |
with io.open(detmf) as fi: | |
for line in filter(lambda s: _re_se.match(s), fi.readlines()): | |
line = line.rstrip() | |
m = _re_s.match(line) | |
if m: | |
_tmp.append([float(m.group(1))]) | |
else: | |
m = _re_e.match(line) | |
_tmp[-1].append(float(m.group(1))) | |
return _extract_in_range( | |
list(filter(lambda it: len(it) == 2, _tmp)), | |
unsplittables, neg=True) | |
def _get_splitpoints(black_rl, silence_rl, thr, remove_ranges, split_pos): | |
result = [] | |
s_ix_min = 0 | |
for bl_s, bl_e in black_rl: | |
for s_ix in range(s_ix_min, len(silence_rl)): | |
si_s, si_e = silence_rl[s_ix] | |
if _ranges_overlapped((bl_s, bl_e), (si_s, si_e)): | |
s, e = max(bl_s, si_s), min(bl_e, si_e) | |
if e - s >= thr: | |
if len(split_pos) == 1: | |
result.append(s + (e - s) * split_pos[0]) | |
else: | |
bs, be = s + (e - s) * split_pos[0], s + (e - s) * split_pos[1] | |
result.append(bs) | |
result.append(be) | |
remove_ranges.append([bs, be]) | |
s_ix_min = s_ix | |
break | |
if remove_ranges and remove_ranges[0]: | |
for rr_s, rr_e in remove_ranges: | |
result.append(rr_s) | |
result.append(rr_e) | |
return sorted(list(set(result))) | |
def _get_part_ranges( | |
black_rl, silence_rl, thr, dur, trim_s, trim_e, remove_ranges, min_duration, split_pos): | |
r""" | |
>>> _print = lambda lst: print( | |
... ", ".join(["[%.2f, %.2f]" % (s, e) for s, e in list(lst)])) | |
>>> dur = 20 | |
>>> _print( | |
... _get_part_ranges( | |
... black_rl=[[3, 7]], | |
... silence_rl=[[4, 6]], | |
... thr=0.5, dur=dur, trim_s=0, trim_e=dur, | |
... remove_ranges=[], | |
... min_duration=0.1, split_pos=[0.5])) | |
[0.00, 5.00], [5.00, 20.00] | |
>>> _print( | |
... _get_part_ranges( | |
... black_rl=[[3, 7]], | |
... silence_rl=[[4, 6]], | |
... thr=0.5, dur=dur, trim_s=0, trim_e=dur, | |
... remove_ranges=[[3.5, 4.5]], | |
... min_duration=0.1, split_pos=[0.5])) | |
[0.00, 3.50], [4.50, 5.00], [5.00, 20.00] | |
>>> _print( | |
... _get_part_ranges( | |
... black_rl=[[3, 7]], | |
... silence_rl=[[4, 6]], | |
... thr=0.5, dur=dur, trim_s=0, trim_e=dur, | |
... remove_ranges=[[3.5, 6]], | |
... min_duration=0.1, split_pos=[0.5])) | |
[0.00, 3.50], [6.00, 20.00] | |
>>> _print( | |
... _get_part_ranges( | |
... black_rl=[[3, 7]], | |
... silence_rl=[[4, 6]], | |
... thr=0.5, dur=dur, trim_s=1.0, trim_e=dur-5, | |
... remove_ranges=[[3.5, 6]], | |
... min_duration=0.1, split_pos=[0.5])) | |
[1.00, 3.50], [6.00, 15.00] | |
>>> remove_ranges=[] | |
>>> _print( | |
... _get_part_ranges( | |
... black_rl=[[3, 7]], | |
... silence_rl=[[4, 6]], | |
... thr=0.5, dur=dur, trim_s=0, trim_e=dur, | |
... remove_ranges=remove_ranges, | |
... min_duration=0.1, split_pos=[0, 1])) | |
[0.00, 4.00], [6.00, 20.00] | |
""" | |
tsl = _get_splitpoints( | |
black_rl, silence_rl, thr, remove_ranges, split_pos) | |
s_pends = [] | |
for (s, e) in _extract_in_range( | |
zip([0.0] + tsl, tsl + [dur]), | |
[[trim_s, trim_e]], neg=False): | |
if remove_ranges and remove_ranges[0]: | |
if any((_contains((s, e), rr) for rr in remove_ranges)): | |
continue | |
if e - s < min_duration and e < trim_e: | |
s_pends.append(s) | |
continue | |
if s_pends: | |
s = s_pends[0] | |
s_pends = [] | |
yield (s, e) | |
class _ControlParams(object): | |
jsonkeys = [ | |
"sdet_paras", "bdet_paras", | |
"unsplittables", "remove_ranges", | |
"additional_split_points", | |
"ranges_using_silence_only", | |
"ranges_using_black_only", | |
"split_pos", | |
] | |
def __init__(self, args): | |
os.stat(args.infile) | |
for n in [n for n in dir(args) if not n.startswith("_")]: | |
v = getattr(args, n) | |
if n in self.jsonkeys: | |
try: | |
v = json.loads(v) | |
except json.decoder.JSONDecodeError as e: | |
raise ValueError("--{}: {}".format(n, repr(e))) | |
setattr(self, n, v) | |
if not isinstance(self.split_pos, (list,)): | |
self.split_pos = [self.split_pos] | |
if len(self.split_pos) < 1 or 2 < len(self.split_pos): | |
raise ValueError("len(split_pos) must be 1 or 2") | |
# | |
if not self.af_det_prep: | |
self.af_det_prep = self.af_prep | |
if not self.vf_det_prep: | |
self.vf_det_prep = self.vf_prep | |
# | |
self.outfilebase, self.ext = os.path.splitext(args.infile) | |
if args.outfilebase: | |
self.outfilebase, self.ext = os.path.splitext(args.outfilebase) | |
if args.destdir: | |
self.outfilebase = os.path.join( | |
args.destdir, self.outfilebase).replace("\\", "/") | |
# | |
ffprres = check_stderroutput(["ffprobe", "-hide_banner", args.infile]) | |
self.dur = _tss_to_ts(re.search(r" Duration: ([0-9:.]+), ", ffprres.decode()).group(1)) | |
# | |
self.trim_s = 0.0 | |
if args.trim_s: | |
self.trim_s = _tss_to_ts(args.trim_s) | |
self.trim_e = self.dur | |
if args.trim_e: | |
self.trim_e = _tss_to_ts(args.trim_e) | |
# | |
self.unsplittables = _norm_tsrange_list( | |
self.unsplittables, self.dur) | |
self.remove_ranges = _norm_tsrange_list( | |
self.remove_ranges, self.dur) | |
# | |
asps = self.additional_split_points | |
if asps: | |
self.remove_ranges.extend( | |
_norm_tsrange_list([ | |
[m, m] for m in asps], self.dur)) | |
# | |
self.ranges_using_silence_only = _norm_tsrange_list( | |
self.ranges_using_silence_only, self.dur) | |
self.ranges_using_black_only = _norm_tsrange_list( | |
self.ranges_using_black_only, self.dur) | |
# | |
self.vfs_common = list(filter(None, [self.vf_prep])) | |
self.afs_common = list(filter(None, [self.af_prep])) | |
def _build_argparser_common(): | |
import argparse | |
ap = argparse.ArgumentParser() | |
ap.add_argument("infile") | |
ap.add_argument("--outfilebase") | |
ap.add_argument("--destdir") | |
ap.add_argument("--seqno_format", default="-%02d") | |
ap.add_argument("--ffmpeg", default='ffmpeg') | |
ap.add_argument("--if_only_one_side", action="store_true") | |
ap.add_argument( | |
"--sdet_paras", | |
default='{"d": 0.2, "n": 18e-4}') | |
ap.add_argument( | |
"--bdet_paras", | |
default='{"d": 0.2, "pic_th": 0.982, "pix_th": 15e-3}') | |
ap.add_argument("--sdet_force", action="store_true") | |
ap.add_argument("--bdet_force", action="store_true") | |
ap.add_argument( | |
"--unsplittables", | |
default='[[0.0, 5.0]]') | |
ap.add_argument( | |
"--duration_threshold", | |
type=float, default=0.7) | |
ap.add_argument( | |
"--split_pos", default="[0.5]") | |
ap.add_argument("--vf_prep", default='') | |
ap.add_argument("--af_prep", default='') | |
# only for detector | |
ap.add_argument("--vf_det_prep", default='') # if specified, --vf_prep will be ignored for detector. | |
ap.add_argument("--af_det_prep", default='') # if specified, --af_prep will be ignored for detector. | |
# -------------------- | |
ap.add_argument("--trim_s", default="") | |
ap.add_argument("--trim_e", default="") | |
ap.add_argument("--remove_ranges", default="[[]]") | |
ap.add_argument("--additional_split_points", default="[]") | |
ap.add_argument("--ranges_using_silence_only", default="[[]]") | |
ap.add_argument("--ranges_using_black_only", default="[[]]") | |
ap.add_argument("--preamble", default="") | |
ap.add_argument("--postamble", default="") | |
ap.add_argument("--min_duration", type=float, default=12.0) | |
return ap | |
class _DetectorResult(object): | |
def __init__(self, args): | |
args = _ControlParams(args) | |
self.params = args | |
infile = _decode(args.infile) | |
# | |
_silence_rl = _DetInvokator( | |
infile, "S", args.sdet_paras, | |
args.af_det_prep, args.sdet_force).execute( | |
args.ffmpeg, self.params.unsplittables) | |
_black_rl = _DetInvokator( | |
infile, "B", args.bdet_paras, | |
args.vf_det_prep, args.bdet_force).execute( | |
args.ffmpeg, self.params.unsplittables) | |
# | |
if self.params.if_only_one_side: | |
if not _black_rl: | |
_black_rl = _silence_rl | |
if not _silence_rl: | |
_silence_rl = _black_rl | |
# | |
_s_only = self.params.ranges_using_silence_only | |
_b_only = self.params.ranges_using_black_only | |
if _s_only: | |
_black_rl = _extract_in_range(_black_rl, _s_only, neg=True) | |
_black_rl.extend(_extract_in_range(_silence_rl, _s_only, neg=False)) | |
if _b_only: | |
_silence_rl = _extract_in_range(_silence_rl, _b_only, neg=True) | |
_silence_rl.extend(_extract_in_range(_black_rl, _b_only, neg=False)) | |
self.black_rl = list(set([tuple(it) for it in _black_rl])) | |
self.black_rl.sort() | |
self.silence_rl = list(set([tuple(it) for it in _silence_rl])) | |
self.silence_rl.sort() | |
def get_part_ranges(self, kfs=0): | |
for i, (s, e) in enumerate( | |
_get_part_ranges( | |
self.black_rl, self.silence_rl, | |
self.params.duration_threshold, self.params.dur, | |
self.params.trim_s, self.params.trim_e, | |
self.params.remove_ranges, self.params.min_duration, | |
self.params.split_pos)): | |
# ex. | |
# -ss "01:00:15" -to "01:00:30" | |
# => -ss "01:00:10" -ss "00:00:05" -to "00:00:20" | |
ss1, ss2 = 0.0, None | |
if s > kfs: | |
ss1 = s - kfs | |
ss2 = kfs | |
elif s > 0: | |
ss2 = s | |
to = e - ss1 | |
# | |
yield (i, s == self.params.trim_s, e == self.params.trim_e), \ | |
(s, e), \ | |
(ss1, ss2, to) | |
class _Renderer(object): | |
def __init__(self, params, bodytempl=""): | |
self.params = params | |
# | |
self.tmpl = Template("""<%def name="bsh_common_header()">#! /bin/sh | |
ffmpeg="<%text>${ffmpeg:-</%text>${params.ffmpeg}<%text>}</%text>" | |
inmov=${pipes.quote(params.infile)} | |
outbase=${pipes.quote(params.outfilebase)} | |
ext="${params.ext[1:]}"\\ | |
% if params.preamble: | |
# | |
${params.preamble} | |
% endif | |
</%def><%def name="bsh_common_footer()">\\ | |
%if params.postamble: | |
# | |
${params.postamble} | |
%endif | |
</%def>""" + bodytempl) | |
# | |
def render(self, parts): | |
import math | |
import random | |
return self.tmpl.render( | |
# data | |
params=self.params, | |
parts=parts, | |
# modules | |
os=os, | |
sys=sys, | |
pipes=pipes, | |
math=math, | |
random=random, | |
json=json, | |
# local utilities | |
to_tss=_ts_to_tss, | |
fmtsec=lambda d: ("%.3f" % d)) | |
def _video_split_by_blank_main(): | |
ap = _build_argparser_common() | |
ap.add_argument("--trim_s_fadein", type=float, default=0.0) | |
ap.add_argument("--trim_e_fadeout", type=float, default=0.0) | |
ap.add_argument("--render_template") | |
ap.add_argument("--render_templatefile") | |
ap.add_argument("--extra_ffmpeg_outopts", default="") | |
args = ap.parse_args() | |
if args.render_template: | |
tmpl = args.render_template | |
elif args.render_templatefile: | |
tmpl = io.open(args.render_templatefile, "r").read() | |
else: | |
tmpl = """\ | |
${bsh_common_header()} | |
% for (i, first, last), (s, e), (ss1, ss2, to) in parts: | |
<% | |
ss1a = "-ss " + to_tss(ss1) if ss1 else "" | |
ss2a = "-ss " + to_tss(ss2) if ss2 else "" | |
toa = "-to %s" % to_tss(to) | |
# | |
vfs = list(params.vfs_common) | |
afs = list(params.afs_common) | |
if first and params.trim_s_fadein: | |
d = params.trim_s_fadein | |
par = "t=in:st={:.3f}:d={:.3f}".format(ss1, d) | |
vfs.append("fade={}".format(par)) | |
afs.append("afade={}".format(par)) | |
if last and params.trim_e_fadeout: | |
st = to - params.trim_e_fadeout | |
d = params.trim_e_fadeout | |
par = "t=out:st={:.3f}:d={:.3f}".format(st, d) | |
vfs.append("fade={}".format(par)) | |
afs.append("afade={}".format(par)) | |
vf = '-vf "{}"'.format(",".join(vfs)) if vfs else "" | |
af = '-af "{}"'.format(",".join(afs)) if afs else "" | |
# | |
ffmpeg_n = '"${ffmpeg%03d:-${ffmpeg}}"' % (i + 1) | |
out_n = ('"${outbase}' + params.seqno_format + '.${ext}"') % (i + 1) | |
exit_n = "${exit%03d:-:}" % (i + 1) | |
ffmpeg_cmdline = " ".join( | |
filter( | |
None, | |
[ffmpeg_n, '''-y \\\\ | |
''', | |
ss1a, | |
'-i', '"${inmov}"', '''\\\\ | |
''', | |
vf, '''\\\\ | |
''' if vf else "", | |
af, '''\\\\ | |
''' if af else "", | |
ss2a, toa, params.extra_ffmpeg_outopts, out_n])) | |
%> | |
# ${to_tss(s)} -> ${to_tss(e)} (${fmtsec(e - s)} secs) | |
${ffmpeg_cmdline} | |
${exit_n} | |
% endfor | |
${bsh_common_footer()}""" | |
# | |
impl = _DetectorResult(args) | |
print( | |
_Renderer(impl.params, tmpl).render( | |
list(impl.get_part_ranges(kfs=5.0))), | |
end="") | |
def _video_insert_chmark_by_blank_main(): | |
ap = _build_argparser_common() | |
ap.add_argument("--chapter_title_base", default="ch") | |
ap.add_argument("--replace", action="store_true") | |
ap.add_argument("--dont_insert_endmark", action="store_true") | |
ap.add_argument("--extra_ffmpeg_outopts", default="-c copy") | |
tmpl = """\ | |
${bsh_common_header()} | |
extra_ffmpeg_outopts="${params.extra_ffmpeg_outopts}" | |
#<%text> | |
"${ffmpeg}" -y \\ | |
-i "${inmov}" \\ | |
-i pipe: -map_metadata 1 ${extra_ffmpeg_outopts} "${outbase}.${ext}" <<'__END__' | |
;FFMETADATA1</%text> | |
% for (i, _, last), (s, e), _ in parts: | |
[CHAPTER] | |
TIMEBASE=1/1000 | |
# ${to_tss(s)} -> ${to_tss(e)} (${fmtsec(e - s)} secs) | |
START=${int(s * 1000)} | |
END=${int(e * 1000)} | |
title=${params.chapter_title_base}${params.seqno_format % (i + 1)} | |
% if last and not params.dont_insert_endmark: | |
[CHAPTER] | |
TIMEBASE=1/1000 | |
START=${int(e * 1000)} | |
END=${int(e * 1000)} | |
title=end | |
% endif | |
% endfor | |
__END__ | |
% if params.replace: | |
<%text>if test $? -eq 0 ; then | |
mv -fv "${inmov}" "${inmov}".orig && \\ | |
mv -fv "${outbase}.${ext}" "${inmov}" | |
fi</%text> | |
% endif | |
${bsh_common_footer()}""" | |
impl = _DetectorResult(ap.parse_args()) | |
print( | |
_Renderer(impl.params, tmpl).render( | |
list(impl.get_part_ranges())), | |
end="") | |
if __name__ == '__main__': | |
import doctest | |
doctest.testmod() | |
_me = os.path.basename(sys.argv[0]) | |
if "video_split_by_blank" in _me: | |
_video_split_by_blank_main() | |
else: | |
_video_insert_chmark_by_blank_main() |
ln video_split_by_blank.py video_insert_chmark_by_blank.py
require: Mako
user template example:
${bsh_common_header()}
ffmpeg -y -i <%text>"${inmov}"</%text> \
-i 1.png \
-i 2.png \
-i 3.jpg \
-i 4.jpg \
-i 5.jpg \
-i 6.jpg \
-filter_complex "
% for (i, first, last), (s, e), (ss1, ss2, to) in parts:
[${i % 6 + 1}:v]
setsar=1
,scale=-1:640
,pad='1280:720:(max(iw,ow)-min(iw,ow))/2:(max(ih,oh)-min(ih,oh))/2'
,loop=loop=-1:size=2
,trim=0:${fmtsec(e - s)}
,setpts=PTS-STARTPTS
[v_${i}];
% endfor
${"".join(["[v_%d]" % j for j in range(len(parts))])}
concat=a=0:n=${len(parts)}
,format=yuv420p
[v]
" -map '[v]' -map '0:a' -c:a copy <%text>"${outbase}.mp4"</%text>
${bsh_common_footer()}
Another template example:
${bsh_common_header()}
ffmpeg -y -i <%text>"${inmov}"</%text> \
-i _bg1.png \
-filter_complex "
% for (i, first, last), (s, e), (ss1, ss2, to) in parts:
[1:v]
setsar=1
,pad='1280:720:${random.randint(0, 1280 - 640)}:${random.randint(0, 720 - 400)}\
:${"0x{x:02X}{x:02X}{x:02X}".format(x=random.randint(0, 64))}'
,loop=loop=-1:size=2
,trim=0:${fmtsec(e - s)}
,setpts=PTS-STARTPTS
[v_${i}];
% endfor
${"".join(["[v_%d]" % j for j in range(len(parts))])}concat=a=0:n=${len(parts)},format=yuv420p[v]
" -map '[v]' -map '0:a' -c:a copy <%text>"${outbase}.mp4"</%text>
${bsh_common_footer()}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
see http://hhsprings.pinoko.jp/site-hhs/2019/02/%E5%8B%95%E7%94%BB%E3%82%92%E3%80%8C%E7%9C%9F%E3%81%A3%E9%BB%92%E3%81%8B%E3%81%A4%E7%84%A1%E9%9F%B3%E3%80%8D%E9%83%A8%E5%88%86%E3%81%AB%E5%9F%BA%E3%81%A5%E3%81%84%E3%81%A6%E5%88%86%E5%89%B2%E3%81%AA-f/ (Japanese)