Skip to content

Instantly share code, notes, and snippets.

@hhsprings
Last active June 29, 2018 12:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hhsprings/f79cd7c1ff6d50f6ae3db115b41bf09b to your computer and use it in GitHub Desktop.
Save hhsprings/f79cd7c1ff6d50f6ae3db115b41bf09b to your computer and use it in GitHub Desktop.
#! /bin/env python
# -*- coding: utf-8 -*-
"""
This module is intended as an example of one application of
"align_videos_by_soundtrack.align". Suppose that a certain concert is
shot by multiple people from multiple angles. In most cases, shooting
start and shooting end time have individual differences. It is now time
for "align_videos_by_soundtrack.align" to come. Based on the information
obtained from "align_videos_by_soundtrack.align", this script combines
movies of multiple angles in a tile shape with "hstack" and "vstack".
"""
from __future__ import unicode_literals
from __future__ import absolute_import
import json
import sys
import os
from itertools import chain
import numpy as np
from align_videos_by_soundtrack.align import SyncDetector
from align_videos_by_soundtrack.communicate import check_call
class _Filter(object):
"""
>>> f = _Filter()
>>> f.iv.append("[0:v]")
>>> f.descs.append("scale=600:400")
>>> f.descs.append("setsar=1")
>>> f.ov.append("[v0]")
>>> print(f.to_str())
[0:v]scale=600:400,setsar=1[v0]
>>> #
>>> f = _Filter()
>>> f.iv.append("[0:v]")
>>> f.iv.append("[1:v]")
>>> f.descs.append("concat")
>>> f.ov.append("[vc0]")
>>> print(f.to_str())
[0:v][1:v]concat[vc0]
"""
def __init__(self):
self.iv = []
self.ia = []
self.descs = []
self.ov = []
self.oa = []
def _labels_to_str(self, v, a):
if not v and a:
v = [""] * len(a)
if not a and v:
a = [""] * len(v)
return "".join(chain.from_iterable(zip(v, a)))
def to_str(self):
ilabs = self._labels_to_str(self.iv, self.ia)
filterbody = ",".join(self.descs)
olabs = self._labels_to_str(self.ov, self.oa)
return ilabs + filterbody + olabs
class _StreamWithPadBuilder(object):
def __init__(self, idx, w=960, h=540, sample_rate=44100):
self._idx = idx
# black video stream
fpadv = _Filter()
fpadv.descs.append(
"color=c=black:s=%dx%d:d={duration:.3f},setsar=1" % (w, h))
fpadv.ov.append("[{prepost}padv%d]" % idx)
self._tmpl_padv = (fpadv.to_str(), "".join(fpadv.ov))
# silence left; silence right; -> amerge
fpada = [_Filter(), _Filter(), _Filter()]
nch = 2
for i in range(nch):
fpada[i].descs.append("\
sine=frequency=0:sample_rate={sample_rate}:d={{duration:.3f}}".format(
sample_rate=sample_rate))
olab = "[{{prepost}}pada_c{i}_{idx}]".format(idx=idx, i=i)
fpada[i].oa.append(olab)
fpada[-1].iv.append(olab)
fpada[-1].descs.append("amerge={}".format(len(fpada[-1].iv)))
fpada[-1].oa.append("[{{prepost}}pada{idx}]".format(
idx=idx))
self._tmpl_pada = (
";\n".join([fpada[i].to_str()
for i in range(len(fpada))]),
"".join(fpada[-1].oa))
# filter to original video stream
fbodyv = _Filter()
fbodyv.iv.append("[%d:v]" % idx)
fbodyv.descs.append("scale=%d:%d,setsar=1" % (w, h))
fbodyv.ov.append("[v%d]" % idx)
self._bodyv = (fbodyv.to_str(), "".join(fbodyv.ov))
# filter to original audio stream
fbodya = _Filter()
fbodya.ia.append("[%d:a]" % idx)
fbodya.descs.append("aresample={sample_rate}".format(
sample_rate=sample_rate))
fbodya.oa.append("[a%d]" % idx)
self._bodya = (fbodya.to_str(), "".join(fbodya.oa))
#
self._result = []
self._fconcat = _Filter()
def _add_prepost(self, duration, prepost):
if duration <= 0:
return
self._result.append(
self._tmpl_padv[0].format(prepost=prepost, duration=duration))
self._fconcat.iv.append(self._tmpl_padv[1].format(prepost=prepost))
#
self._result.append(
self._tmpl_pada[0].format(prepost=prepost, duration=duration))
self._fconcat.ia.append(self._tmpl_pada[1].format(prepost=prepost))
def add_pre(self, duration):
self._add_prepost(duration, "pre")
return self
def add_body(self):
self._result.append(self._bodyv[0])
self._fconcat.iv.append(self._bodyv[1])
self._result.append(self._bodya[0])
self._fconcat.ia.append(self._bodya[1])
return self
def add_post(self, duration):
self._add_prepost(duration, "post")
return self
def build(self):
if len(self._fconcat.iv) > 1:
self._fconcat.descs.append(
"concat=n=%d:v=1:a=1" % len(self._fconcat.iv))
self._fconcat.ov.append("[vc%d]" % self._idx)
self._fconcat.oa.append("[ac%d]" % self._idx)
self._result.append(self._fconcat.to_str())
else:
self._fconcat.ov.extend(self._fconcat.iv)
self._fconcat.oa.extend(self._fconcat.ia)
#
return (
";\n".join(self._result),
self._fconcat.ov[0],
self._fconcat.oa[0])
class _StackVideosFilterGraphBuilder(object):
def __init__(self, shape=(2, 2), w=960, h=540, sample_rate=44100):
self._shape = shape
self._builders = []
for i in range(shape[0] * shape[1]):
self._builders.append(
_StreamWithPadBuilder(i, w, h, sample_rate))
def set_paddings(self, idx, pre, post):
self._builders[idx].add_pre(pre)
self._builders[idx].add_body()
self._builders[idx].add_post(post)
def build(self):
result = []
_r = []
len_stacks = len(self._builders)
for i in range(len_stacks):
_r.append(self._builders[i].build())
result.append(_r[i][0])
# stacks for video
fvstack = _Filter()
if self._shape[0] > 1:
for i in range(self._shape[1]):
fhstack = _Filter()
fhstack.iv.extend([
_ri[1]
for _ri in _r[i * self._shape[0]:(i + 1) * self._shape[0]]])
fhstack.descs.append("hstack=inputs={}".format(len(_r[i * self._shape[0]:(i + 1) * self._shape[0]])))
olab = "[{}v]".format("%d" % (i + 1) if self._shape[1] > 1 else "")
fhstack.ov.append(olab)
result.append(fhstack.to_str())
fvstack.iv.append(olab)
else:
fvstack.iv.extend([_ri[1] for _ri in _r])
if self._shape[1] > 1:
# vstack
fvstack.descs.append("vstack=inputs={}".format(self._shape[1]))
fvstack.ov.append("[v]")
result.append(fvstack.to_str())
# stacks for audio (amerge)
nch = 2
weight = 1 - np.array([i // self._shape[0] for i in range(self._shape[0] * nch)])
ch = [
" + ".join([
"c%d" % (i)
for i in range(len(_r) * nch)
if weight[i % (self._shape[0] * nch)]]),
" + ".join([
"c%d" % (i)
for i in range(len(_r) * nch)
if (1 - weight)[i % (self._shape[0] * nch)]])
]
result.append("""\
{}
amerge=inputs={},
pan=stereo|\\
c0 < {} |\\
c1 < {}
[a]""".format(
"".join([_ri[2] for _ri in _r]),
len_stacks, ch[0], ch[1],))
#
return ";\n\n".join(result)
def _build(args):
shape = json.loads(args.shape) if args.shape else (2, 2)
if len(args.files) != shape[0] * shape[1]:
parser.print_usage()
sys.exit(1)
files = list(map(os.path.abspath, args.files))
b = _StackVideosFilterGraphBuilder(
shape=shape, w=args.w, h=args.h, sample_rate=args.sample_rate)
with SyncDetector(
max_misalignment=args.max_misalignment) as det:
for i, inf in enumerate(det.align(files)):
b.set_paddings(i, inf[1]["pad"], inf[1]["pad_post"])
#
filter_complex = b.build()
#
return filter_complex, ['[v]', '[a]']
def main(args=sys.argv):
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"files", nargs="*",
help="The media files which contains both video and audio.")
parser.add_argument(
"-o", "--outfile", dest="outfile", default="merged.mp4")
parser.add_argument(
'--mode', choices=['script_bash', 'direct'], default='script_bash',
help="""\
Switching whether to produce bash shellscript or to call ffmpeg directly.""")
parser.add_argument(
'--max_misalignment', type=int, default=2*60,
help="""\
See the help of alignment_info_by_sound_track.""")
parser.add_argument(
'--shape', type=str, default="[2, 2]",
help="The shape of the tile, like '[2, 2]'")
parser.add_argument(
'--sample_rate', type=int, default=44100,
help="Sampling rate of the output file.")
parser.add_argument(
'--width-per-cell', dest="w", type=int, default=960,
help="Width of the cell.")
parser.add_argument(
'--height-per-cell', dest="h", type=int, default=540,
help="Height of the cell.")
extra_ffargs = [
"-color_primaries", "bt709", "-color_trc", "bt709", "-colorspace", "bt709"
]
args = parser.parse_args(args[1:])
fc, maps = _build(args)
if args.mode == "script_bash":
print("""\
#! /bin/sh
ffmpeg -y \\
{} \\
-filter_complex "
{}
" {} \\
{} \\
"{}"
""".format(" ".join(['-i "{}"'.format(f) for f in args.files]),
fc,
" ".join(["-map '%s'" % m for m in maps]),
" ".join(extra_ffargs),
args.outfile))
else:
cmd = ["ffmpeg", "-y"]
for fn in args.files:
cmd.extend(["-i", fn])
cmd.extend(["-filter_complex", fc])
for m in maps:
cmd.extend(["-map", m])
cmd.extend(extra_ffargs)
cmd.append(args.outfile)
check_call(cmd)
#
if __name__ == '__main__':
main()
@hhsprings
Copy link
Author

@hhsprings
Copy link
Author

This script was merged here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment