Skip to content

Instantly share code, notes, and snippets.

@FichteFoll
Last active May 10, 2019 06:41
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save FichteFoll/9184e0ef75df71d7da184c485caf5266 to your computer and use it in GitHub Desktop.
Save FichteFoll/9184e0ef75df71d7da184c485caf5266 to your computer and use it in GitHub Desktop.
Snap start and end times of ASS subtitles to scene changes using WWXD
#!/usr/bin/env python3.6
"""Snap start and end times of ASS subtitles to scene changes using WWXD.
Does not work with variable frame rate (VFR).
usage: snap_scenechanges.py [-h] [--epsilon EPSILON] [-o OUTPUT] [-v]
sub_path video_path
positional arguments:
sub_path Path to subtitle file.
video_path Path to video file.
optional arguments:
-h, --help show this help message and exit
--epsilon EPSILON Number of frames to search for scene changes around
each frame.
-o OUTPUT, --output OUTPUT
Output path. By default, the input path with
`_snapped` appended.
-v, --verbose Increase verbosity.
Requires:
- Vapoursynth
- Vapoursynth WWXD plugin (https://github.com/dubhater/vapoursynth-wwxd)
- ffms2
*or*
L-SMASH plugin
- python-ass (https://github.com/rfw/python-ass)
"""
# Copyright 2017 FichteFoll <fichtefoll2@googlemail.com>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
# OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# TODO
# - improve caching a lot
# use a list of len(clip) with tri-state logic
# and dump/load from or to file (disableable)
import argparse
from collections import defaultdict
from itertools import cycle, islice
import logging
import math
from pathlib import Path
import sys
from datetime import timedelta
from typing import List, Optional, Iterable
import ass
import vapoursynth
__version__ = "0.2.2"
l = logging.getLogger(__name__)
core = vapoursynth.get_core()
# itertools recipe
def roundrobin(*iterables: Iterable) -> Iterable:
"roundrobin('ABC', 'D', 'EF') --> A D E B F C"
# Recipe credited to George Sakkis
pending = len(iterables)
nexts = cycle(iter(it).__next__ for it in iterables)
while pending:
try:
for next_ in nexts:
yield next_()
except StopIteration:
pending -= 1
nexts = cycle(islice(nexts, pending))
def negate(iterable: Iterable) -> Iterable:
for i in iterable:
yield -i
def parse_args(args: Optional[List[str]]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Snap start and end times of subtitles to scene changes using WWXD."
)
parser.add_argument("sub_path", type=Path,
help="Path to subtitle file.")
parser.add_argument("video_path", type=Path,
help="Path to video file.")
parser.add_argument("--epsilon", type=int, default=7,
help="Number of frames to search for scene changes around each frame.")
parser.add_argument("-o", "--output", type=Path,
help="Output path. By default, the input path with `_snapped` appended.")
parser.add_argument("-v", "--verbose", action='store_true',
help="Increase verbosity.")
return parser.parse_args(args)
def time_to_frame(time: timedelta, framerate: float, floor: bool = True) -> int:
frame = time.total_seconds() * framerate
if floor:
return math.floor(frame)
else:
return math.ceil(frame)
def frame_to_time(frame: int, framerate: float, floor: bool = True) -> timedelta:
if floor:
middle_frame = max(0, frame - 0.5)
else:
middle_frame = frame + 0.5
secs = middle_frame / framerate
secs = round(secs, 2) # round to centiseconds because python-ass floors by default
return timedelta(seconds=secs)
# cache scene-changing frames for speedup (by clip)
_scene_change_cache = defaultdict(set)
def find_nearest_scenechange(clip, frame: int, epsilon: int, prefer_forward: bool = False) \
-> Optional[int]:
# We want an iterator like [-1, 1, -2, 2, -3, 3, -4, 4, -5, 5, -6, 6, -7, 7]
base_range = range(1, epsilon + 1)
offsets = roundrobin(base_range, negate(base_range))
if not prefer_forward:
offsets = negate(offsets)
else:
offsets = roundrobin(negate(base_range), negate(base_range))
last_frame = len(clip) - 1
for offset in offsets:
test_frame = frame + offset
if not (0 <= test_frame <= last_frame):
continue
if test_frame in _scene_change_cache[clip]:
return test_frame
is_scenechange = clip.get_frame(test_frame).props.Scenechange
if is_scenechange:
_scene_change_cache[clip].add(test_frame)
return test_frame
return None
def snap_keyframes(script, input_clip, epsilon: int):
# We resize to speed-up the wwxd filter.
# The target resultion is kinda arbitrary,
# but seems to produce the "best results".
# Additionally, we ensure that we have 420P8
# because scxvid requires that (for obvious reasons).
clip = input_clip.resize.Bilinear(640, 360, format=vapoursynth.YUV420P8)
clip = clip.wwxd.WWXD()
framerate = clip.fps_num / 1001
l.info("Video frame rate: %.3f", framerate)
l.info("Snapping...")
# Iterate over dialog lines and adjust timestamps
for event in script.events:
if event.TYPE != "Dialogue":
continue
l.debug("Checking line: %s", event.dump())
# Timestamps (should) always point between two frames.
#
# Our goal here is to have the first frame *after* the start time
# and the first frame *after* the end time to be a scene change.
# Thus, we ceil the timestamps when loading
# but floor the timestamps of scene changes we found when dumping.
start_frame = time_to_frame(event.start, framerate, floor=False)
end_frame = time_to_frame(event.end, framerate, floor=False)
nearest_start = find_nearest_scenechange(clip, start_frame, epsilon)
nearest_end = find_nearest_scenechange(clip, end_frame, epsilon)
if nearest_start is not None:
event.start = frame_to_time(nearest_start, framerate, floor=True)
l.debug(" Adjusted start by %+d frames", nearest_start - start_frame)
if nearest_end is not None:
event.end = frame_to_time(nearest_end, framerate, floor=True)
l.debug(" Adjusted end by %+d frames", nearest_end - end_frame)
l.debug(" New line: %s", event.dump())
l.debug("List of frames with scene changes: %s",
list(sorted(_scene_change_cache[clip])))
def main(args: List[str] = None) -> int:
params = parse_args(args)
# configure logging
l.addHandler(logging.StreamHandler())
log_level = logging.DEBUG if params.verbose else logging.INFO
l.setLevel(log_level)
l.debug("snap_scenechanges.py version %s", __version__)
# verify args
if not params.sub_path.suffix == ".ass":
l.error("Only works with Advanced Substation Alpha subtitles (.ass)")
return 1
if not params.output:
params.output = params.sub_path.with_name(
f"{params.sub_path.stem}_snapped{params.sub_path.suffix}"
)
l.info("Parsing subtitles...")
with params.sub_path.open(encoding='utf-8-sig') as f:
# I think ASS files universally use a BOM
script = ass.parse(f)
l.info("Opening video...")
if hasattr(core, 'lsmas'):
clip = core.lsmas.LWLibavSource(str(params.video_path))
elif hasattr(core, 'ffms2'):
l.info("Falling back to ffms2")
clip = core.ffms2.Source(str(params.video_path))
else:
l.error("ffms2 or L-SMASH plugins are not available")
return 2
# do wörk
snap_keyframes(script, clip, params.epsilon)
# save new file
l.info("Writing subtitles...")
with params.output.open('w', encoding='utf-8-sig') as f:
script.dump_file(f)
return 0
if __name__ == '__main__':
sys.exit(main())
@FichteFoll
Copy link
Author

The cache could be a lot better.

If you use this and would like to have better caching (see TODO comment), reach out to me in some way that are not these comments, because I am not notified for those. You'll figure something out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment