Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#! /bin/env python
# -*- coding: utf-8 -*-
"""
This module is intended as an example of one application of
"align_videos_by_soundtrack.align". Suppose that a certain concert is
shot by multiple people from multiple angles. In most cases, shooting
start and shooting end time have individual differences. It is now time
for "align_videos_by_soundtrack.align" to come. Based on the information
obtained from "align_videos_by_soundtrack.align", this script combines
movies of multiple angles in a tile shape with "hstack" and "vstack".
"""
from __future__ import unicode_literals
from __future__ import absolute_import
import json
import sys
import os
from itertools import chain
import numpy as np
from align_videos_by_soundtrack.align import SyncDetector
from align_videos_by_soundtrack.communicate import check_call
class _Filter(object):
"""
>>> f = _Filter()
>>> f.iv.append("[0:v]")
>>> f.descs.append("scale=600:400")
>>> f.descs.append("setsar=1")
>>> f.ov.append("[v0]")
>>> print(f.to_str())
[0:v]scale=600:400,setsar=1[v0]
>>> #
>>> f = _Filter()
>>> f.iv.append("[0:v]")
>>> f.iv.append("[1:v]")
>>> f.descs.append("concat")
>>> f.ov.append("[vc0]")
>>> print(f.to_str())
[0:v][1:v]concat[vc0]
"""
def __init__(self):
self.iv = []
self.ia = []
self.descs = []
self.ov = []
self.oa = []
def _labels_to_str(self, v, a):
if not v and a:
v = [""] * len(a)
if not a and v:
a = [""] * len(v)
return "".join(chain.from_iterable(zip(v, a)))
def to_str(self):
ilabs = self._labels_to_str(self.iv, self.ia)
filterbody = ",".join(self.descs)
olabs = self._labels_to_str(self.ov, self.oa)
return ilabs + filterbody + olabs
class _StreamWithPadBuilder(object):
def __init__(self, idx, w=960, h=540, sample_rate=44100):
self._idx = idx
# black video stream
fpadv = _Filter()
fpadv.descs.append(
"color=c=black:s=%dx%d:d={duration:.3f},setsar=1" % (w, h))
fpadv.ov.append("[{prepost}padv%d]" % idx)
self._tmpl_padv = (fpadv.to_str(), "".join(fpadv.ov))
# silence left; silence right; -> amerge
fpada = [_Filter(), _Filter(), _Filter()]
nch = 2
for i in range(nch):
fpada[i].descs.append("\
sine=frequency=0:sample_rate={sample_rate}:d={{duration:.3f}}".format(
sample_rate=sample_rate))
olab = "[{{prepost}}pada_c{i}_{idx}]".format(idx=idx, i=i)
fpada[i].oa.append(olab)
fpada[-1].iv.append(olab)
fpada[-1].descs.append("amerge={}".format(len(fpada[-1].iv)))
fpada[-1].oa.append("[{{prepost}}pada{idx}]".format(
idx=idx))
self._tmpl_pada = (
";\n".join([fpada[i].to_str()
for i in range(len(fpada))]),
"".join(fpada[-1].oa))
# filter to original video stream
fbodyv = _Filter()
fbodyv.iv.append("[%d:v]" % idx)
fbodyv.descs.append("scale=%d:%d,setsar=1" % (w, h))
fbodyv.ov.append("[v%d]" % idx)
self._bodyv = (fbodyv.to_str(), "".join(fbodyv.ov))
# filter to original audio stream
fbodya = _Filter()
fbodya.ia.append("[%d:a]" % idx)
fbodya.descs.append("aresample={sample_rate}".format(
sample_rate=sample_rate))
fbodya.oa.append("[a%d]" % idx)
self._bodya = (fbodya.to_str(), "".join(fbodya.oa))
#
self._result = []
self._fconcat = _Filter()
def _add_prepost(self, duration, prepost):
if duration <= 0:
return
self._result.append(
self._tmpl_padv[0].format(prepost=prepost, duration=duration))
self._fconcat.iv.append(self._tmpl_padv[1].format(prepost=prepost))
#
self._result.append(
self._tmpl_pada[0].format(prepost=prepost, duration=duration))
self._fconcat.ia.append(self._tmpl_pada[1].format(prepost=prepost))
def add_pre(self, duration):
self._add_prepost(duration, "pre")
return self
def add_body(self):
self._result.append(self._bodyv[0])
self._fconcat.iv.append(self._bodyv[1])
self._result.append(self._bodya[0])
self._fconcat.ia.append(self._bodya[1])
return self
def add_post(self, duration):
self._add_prepost(duration, "post")
return self
def build(self):
if len(self._fconcat.iv) > 1:
self._fconcat.descs.append(
"concat=n=%d:v=1:a=1" % len(self._fconcat.iv))
self._fconcat.ov.append("[vc%d]" % self._idx)
self._fconcat.oa.append("[ac%d]" % self._idx)
self._result.append(self._fconcat.to_str())
else:
self._fconcat.ov.extend(self._fconcat.iv)
self._fconcat.oa.extend(self._fconcat.ia)
#
return (
";\n".join(self._result),
self._fconcat.ov[0],
self._fconcat.oa[0])
class _StackVideosFilterGraphBuilder(object):
def __init__(self, shape=(2, 2), w=960, h=540, sample_rate=44100):
self._shape = shape
self._builders = []
for i in range(shape[0] * shape[1]):
self._builders.append(
_StreamWithPadBuilder(i, w, h, sample_rate))
def set_paddings(self, idx, pre, post):
self._builders[idx].add_pre(pre)
self._builders[idx].add_body()
self._builders[idx].add_post(post)
def build(self):
result = []
_r = []
len_stacks = len(self._builders)
for i in range(len_stacks):
_r.append(self._builders[i].build())
result.append(_r[i][0])
# stacks for video
fvstack = _Filter()
if self._shape[0] > 1:
for i in range(self._shape[1]):
fhstack = _Filter()
fhstack.iv.extend([
_ri[1]
for _ri in _r[i * self._shape[0]:(i + 1) * self._shape[0]]])
fhstack.descs.append("hstack=inputs={}".format(len(_r[i * self._shape[0]:(i + 1) * self._shape[0]])))
olab = "[{}v]".format("%d" % (i + 1) if self._shape[1] > 1 else "")
fhstack.ov.append(olab)
result.append(fhstack.to_str())
fvstack.iv.append(olab)
else:
fvstack.iv.extend([_ri[1] for _ri in _r])
if self._shape[1] > 1:
# vstack
fvstack.descs.append("vstack=inputs={}".format(self._shape[1]))
fvstack.ov.append("[v]")
result.append(fvstack.to_str())
# stacks for audio (amerge)
nch = 2
weight = 1 - np.array([i // self._shape[0] for i in range(self._shape[0] * nch)])
ch = [
" + ".join([
"c%d" % (i)
for i in range(len(_r) * nch)
if weight[i % (self._shape[0] * nch)]]),
" + ".join([
"c%d" % (i)
for i in range(len(_r) * nch)
if (1 - weight)[i % (self._shape[0] * nch)]])
]
result.append("""\
{}
amerge=inputs={},
pan=stereo|\\
c0 < {} |\\
c1 < {}
[a]""".format(
"".join([_ri[2] for _ri in _r]),
len_stacks, ch[0], ch[1],))
#
return ";\n\n".join(result)
def _build(args):
shape = json.loads(args.shape) if args.shape else (2, 2)
if len(args.files) != shape[0] * shape[1]:
parser.print_usage()
sys.exit(1)
files = list(map(os.path.abspath, args.files))
b = _StackVideosFilterGraphBuilder(
shape=shape, w=args.w, h=args.h, sample_rate=args.sample_rate)
with SyncDetector(
max_misalignment=args.max_misalignment) as det:
for i, inf in enumerate(det.align(files)):
b.set_paddings(i, inf[1]["pad"], inf[1]["pad_post"])
#
filter_complex = b.build()
#
return filter_complex, ['[v]', '[a]']
def main(args=sys.argv):
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"files", nargs="*",
help="The media files which contains both video and audio.")
parser.add_argument(
"-o", "--outfile", dest="outfile", default="merged.mp4")
parser.add_argument(
'--mode', choices=['script_bash', 'direct'], default='script_bash',
help="""\
Switching whether to produce bash shellscript or to call ffmpeg directly.""")
parser.add_argument(
'--max_misalignment', type=int, default=2*60,
help="""\
See the help of alignment_info_by_sound_track.""")
parser.add_argument(
'--shape', type=str, default="[2, 2]",
help="The shape of the tile, like '[2, 2]'")
parser.add_argument(
'--sample_rate', type=int, default=44100,
help="Sampling rate of the output file.")
parser.add_argument(
'--width-per-cell', dest="w", type=int, default=960,
help="Width of the cell.")
parser.add_argument(
'--height-per-cell', dest="h", type=int, default=540,
help="Height of the cell.")
extra_ffargs = [
"-color_primaries", "bt709", "-color_trc", "bt709", "-colorspace", "bt709"
]
args = parser.parse_args(args[1:])
fc, maps = _build(args)
if args.mode == "script_bash":
print("""\
#! /bin/sh
ffmpeg -y \\
{} \\
-filter_complex "
{}
" {} \\
{} \\
"{}"
""".format(" ".join(['-i "{}"'.format(f) for f in args.files]),
fc,
" ".join(["-map '%s'" % m for m in maps]),
" ".join(extra_ffargs),
args.outfile))
else:
cmd = ["ffmpeg", "-y"]
for fn in args.files:
cmd.extend(["-i", fn])
cmd.extend(["-filter_complex", fc])
for m in maps:
cmd.extend(["-map", m])
cmd.extend(extra_ffargs)
cmd.append(args.outfile)
check_call(cmd)
#
if __name__ == '__main__':
main()
@hhsprings

This comment has been minimized.

Copy link
Owner Author

commented Jun 20, 2018

@hhsprings

This comment has been minimized.

Copy link
Owner Author

commented Jun 29, 2018

This script was merged here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.