Skip to content

Instantly share code, notes, and snippets.

@kageru
Forked from FichteFoll/snap_scenechanges.py
Last active January 28, 2017 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kageru/0e4b4f0e8b443c59552b66d5f8b55d93 to your computer and use it in GitHub Desktop.
Save kageru/0e4b4f0e8b443c59552b66d5f8b55d93 to your computer and use it in GitHub Desktop.
Snap start and end times of ASS subtitles to scene changes using WWXD
#!/usr/bin/env python3.6
"""Snap start and end times of ASS subtitles to scene changes using WWXD.
Does not work with variable frame rate (VFR).
usage: snap_scenechanges.py [-h] [--epsilon EPSILON] [-o OUTPUT] [-v]
sub_path video_path
positional arguments:
sub_path Path to subtitle file.
video_path Path to video file.
optional arguments:
-h, --help show this help message and exit
--epsilon EPSILON Number of frames to search for scene changes around
each frame.
-o OUTPUT, --output OUTPUT
Output path. By default, the input path with
`_snapped` appended.
-v, --verbose Increase verbosity.
Requires:
- Vapoursynth
- Vapoursynth WWXD plugin (https://github.com/dubhater/vapoursynth-wwxd)
- lsmas
- python-ass (https://github.com/rfw/python-ass)
"""
# Copyright 2017 FichteFoll <fichtefoll2@googlemail.com>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
# OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
# Slight modifications: kageru@encode.moe
import argparse
from collections import defaultdict
from itertools import cycle, islice
import logging
import math
from pathlib import Path
import sys
from datetime import timedelta
from typing import List, Optional, Iterable
from multiprocessing import Pool
import ass
import vapoursynth
l = logging.getLogger(__name__)
core = vapoursynth.get_core()
parser = argparse.ArgumentParser(description="Snap start and end times of subtitles to scene changes using WWXD.")
parser.add_argument("sub_path", type=Path, help="Path to subtitle file.")
parser.add_argument("video_path", type=Path, help="Path to video file.")
parser.add_argument("--epsilon", type=int, default=7, help="Number of frames to search for scene changes around each frame.")
parser.add_argument("-o", "--output", type=Path, help="Output path. By default, the input path with `_snapped` appended.")
parser.add_argument("-v", "--verbose", action='store_true', help="Increase verbosity.")
params = parser.parse_args()
epsilon = params.epsilon
# Start by making a resized and wwxd'd clip.
# We resize to speed-up the wwxd filter.
# The target resultion is kinda arbitrary,
# but seems to produce the "best results"
# while being reasonably fast.
# Note that this would technically work with ffms2 (which is faster),
# but ffms2 is known to have issues with frame precise access and indexing of m2ts files,
# so lsmas is used instead.
# There are some limitations regarding multithreading under windows,
# and defining snap_line() as a global function and these as global parameters
# seems to be one of the easier workarounds, albeit an ugly one.
l.info("Opening video...")
clip = core.lsmas.LWLibavSource(str(params.video_path)).resize.Bilinear(640, 360, format=vapoursynth.YUV420P8).wwxd.WWXD()
framerate = clip.fps_num / 1001
# itertools recipe
def roundrobin(*iterables: Iterable) -> Iterable:
"roundrobin('ABC', 'D', 'EF') --> A D E B F C"
# Recipe credited to George Sakkis
pending = len(iterables)
nexts = cycle(iter(it).__next__ for it in iterables)
while pending:
try:
for next_ in nexts:
yield next_()
except StopIteration:
pending -= 1
nexts = cycle(islice(nexts, pending))
def negate(iterable: Iterable) -> Iterable:
for i in iterable:
yield -i
def time_to_frame(time: timedelta, framerate: float, floor: bool = True) -> int:
frame = time.total_seconds() * framerate
if floor:
return math.floor(frame)
else:
return math.ceil(frame)
def frame_to_time(frame: int, framerate: float, floor: bool = True) -> timedelta:
if floor:
middle_frame = max(0, frame - 0.5)
else:
middle_frame = frame + 0.5
secs = middle_frame / framerate
secs = round(secs, 2) # round to centiseconds because python-ass floors by default
return timedelta(seconds=secs)
# cache scene-changing frames for speedup (by clip)
_scene_change_cache = defaultdict(set)
def find_nearest_scenechange(clip, frame: int, epsilon: int, prefer_forward: bool = False) \
-> Optional[int]:
# We want an iterator like [-1, 1, -2, 2, -3, 3, -4, 4, -5, 5, -6, 6, -7, 7]
base_range = range(1, epsilon + 1)
offsets = roundrobin(base_range, negate(base_range))
if not prefer_forward:
offsets = negate(offsets)
else:
offsets = roundrobin(negate(base_range), negate(base_range))
# regarding the +1: WWXD sometimes marks both adjacent frames as a scenechange
# in that case, we only want the second frame
for offset in offsets:
test_frame = frame + offset
if test_frame in _scene_change_cache[clip] and test_frame + 1 in _scene_change_cache[clip]:
return test_frame + 1
if test_frame in _scene_change_cache[clip]:
return test_frame
is_scenechange = clip.get_frame(test_frame).props.Scenechange
next_is_scenechange = clip.get_frame(test_frame+1).props.Scenechange
if is_scenechange and next_is_scenechange:
_scene_change_cache[clip].add(test_frame+1)
return test_frame+1
if is_scenechange:
_scene_change_cache[clip].add(test_frame)
return test_frame+1
return None
def snap_line(event):
if event.TYPE != "Dialogue":
return None
l.debug("Checking line: %s", event.dump())
# my desperate multithreading attempts broke the logging ¯\_(ツ)_/¯
if params.verbose:
print(event.text)
# Timestamps (should) always point between two frames.
#
# Our goal here is to have the first frame *after* the start time
# and the first frame *after* the end time to be a scene change.
# Thus, we ceil the timestamps when loading
# but floor the timestamps of scene changes we found when dumping.
start_frame = time_to_frame(event.start, framerate, floor=False)
end_frame = time_to_frame(event.end, framerate, floor=False)
nearest_start = find_nearest_scenechange(clip, start_frame, epsilon)
nearest_end = find_nearest_scenechange(clip, end_frame, epsilon)
if nearest_start is not None:
event.start = frame_to_time(nearest_start, framerate, floor=True)
if params.verbose:
print(" Adjusted start by {:d} frames".format(nearest_start - start_frame))
l.debug(" Adjusted start by %+d frames", nearest_start - start_frame)
if nearest_end is not None:
event.end = frame_to_time(nearest_end, framerate, floor=True)
if params.verbose:
print(" Adjusted end by {:d} frames".format(nearest_end - end_frame))
l.debug(" Adjusted end by %+d frames", nearest_end - end_frame)
l.debug(" New line: %s", event.dump())
return None
def snap_keyframes(script, video_path: Path, epsilon: int):
l.info("Video frame rate: %.3f", framerate)
l.info("Snapping...")
# Iterate over dialog lines and adjust timestamps
# In my tests, 4 threads were sufficient to saturate most systems,
# even if the number of logical threads was significantly higher.
# Since the number of threads linearly increases memory consumption,
# we do not want to spawn more threads than necessary.
pool = Pool(4)
pool.map(snap_line, script.events)
pool.close()
pool.join()
l.debug("List of frames with scene changes: %s",
list(sorted(_scene_change_cache[clip])))
def main(args: List[str] = None) -> int:
# configure logging
l.addHandler(logging.StreamHandler())
log_level = logging.DEBUG if params.verbose else logging.INFO
l.setLevel(log_level)
# verify args
if not params.sub_path.suffix == ".ass":
l.error("Only works with Advanced Substation Alpha subtitles (.ass)")
return 1
if not params.output:
params.output = params.sub_path.with_name("{:s}_snapped{:s}".format(params.sub_path.name, params.sub_path.suffix))
# load ass
l.info("Parsing subtitles...")
with params.sub_path.open(encoding='utf-8') as f:
script = ass.parse(f)
# do wörk
snap_keyframes(script, params.video_path, params.epsilon)
# save new file
l.info("Writing subtitles...")
with params.output.open('w', encoding='utf-8-sig') as f: # I think the BOM is required
script.dump_file(f)
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment