OpenCV script to match a recurring "boundary between content and post-roll ad" frame and write an EDL to stop playback there
#!/usr/bin/env python2 | |
# -*- coding: utf-8 -*- | |
# pylint: disable=line-too-long | |
"""Utility for generating MPV EDL files to skip recurring post-roll ads. | |
Instructions: | |
1. Use MPV's screenshot hotkey to extract a frame that's consistently present | |
at the boundary between the content and the ad. | |
2. Run this script with the screenshot specified via the --template argument. | |
3. Play the resulting EDL file in MPV. | |
(MPV EDL files are like playlists which can specify pieces of video files | |
rather than entire files) | |
--snip-- | |
TODO: | |
- Gather and maintain statistics on which templates matched in a given folder | |
so that the order in which templates are tried can learn as a way to optimize | |
the total runtime. | |
- Read http://docs.opencv.org/trunk/d4/dc6/tutorial_py_template_matching.html | |
- Consider rewriting in Rust to ease shaking the bugs out: | |
http://www.poumeyrol.fr/doc/opencv-rust/opencv/imgproc/fn.match_template.html | |
Sources used to develop this: | |
- https://opencv-python-tutroals.readthedocs.io/en/latest/py_tutorials/py_gui/py_video_display/py_video_display.html#playing-video-from-file | |
- https://pythonprogramming.net/template-matching-python-opencv-tutorial/ | |
- http://stackoverflow.com/q/9629071 | |
- http://stackoverflow.com/a/10979030 | |
- http://stackoverflow.com/a/2981202 | |
- https://github.com/mpv-player/mpv/blob/master/DOCS/edl-mpv.rst | |
- http://smplayer.sourceforge.net/en/mpv | |
Stuff I tried but threw out because of a bug outside my control: | |
- http://www.mplayerhq.hu/DOCS/HTML/en/edl.html | |
- http://forum.smplayer.info/viewtopic.php?p=15706#p15706 | |
Sources used during development: | |
- http://www.pyimagesearch.com/2016/03/07/transparent-overlays-with-opencv/ | |
""" # NOQA | |
from __future__ import (absolute_import, division, print_function, | |
with_statement, unicode_literals) | |
__author__ = "Stephan Sokolow (deitarion/SSokolow)" | |
__appname__ = "TYT Post-Roll Ad Eliminator" | |
__version__ = "0.1" | |
__license__ = "MIT" | |
VIDEO_GLOBS = ['*.mp4', '*.webm'] | |
FRAME_GLOBS = ['*.png', '*.jpg'] | |
import fnmatch, logging, os, re | |
import cv2 as cv | |
import numpy as np | |
log = logging.getLogger(__name__) | |
# Borrowed from: | |
# https://github.com/ssokolow/game_launcher/blob/master/src/util/common.py | |
# (Revision: bc784a5fe3c2d4275fef5ec16612bd67142eb0f8) | |
# TODO: Put this up on PyPI so I don't have to copy it around. | |
def multiglob_compile(globs, prefix=False, re_flags=0): | |
"""Generate a single "A or B or C" regex from a list of shell globs. | |
:param globs: Patterns to be processed by :mod:`fnmatch`. | |
:type globs: iterable of :class:`~__builtins__.str` | |
:param prefix: If ``True``, then :meth:`~re.RegexObject.match` will | |
perform prefix matching rather than exact string matching. | |
:type prefix: :class:`~__builtins__.bool` | |
:rtype: :class:`re.RegexObject` | |
""" | |
if not globs: | |
# An empty globs list should only match empty strings | |
return re.compile('^$') | |
elif prefix: | |
globs = [x + '*' for x in globs] | |
return re.compile('|'.join(fnmatch.translate(x) for x in globs), re_flags) | |
video_ext_re = multiglob_compile(VIDEO_GLOBS, re.I) | |
frame_ext_re = multiglob_compile(FRAME_GLOBS, re.I) | |
def edl_path_for(video_path): | |
"""Single place where EDL-path lookup is defined""" | |
return os.path.splitext(video_path)[0] + '.mpv.edl' | |
def has_edl(video_path, edl_path=None): | |
"""Unified definition of how to check whether we should skip a file""" | |
edl_path = edl_path or edl_path_for(video_path) | |
# TODO: Also handle the "is EDL" case | |
if os.path.exists(edl_path): | |
log.info("EDL already exists. Skipping %r", video_path) | |
return True | |
return False | |
def seek_stream(stream, offset): | |
"""If offset is negative, it will be treated as a "seconds from the end" | |
value, analogous to Python list indexing. | |
""" | |
fps = stream.get(cv.CAP_PROP_FPS) # pylint: disable=no-member | |
if offset > 0: | |
offset *= fps | |
elif offset < 0: | |
frame_count = stream.get( | |
cv.CAP_PROP_FRAME_COUNT) # pylint: disable=no-member | |
offset = frame_count + (offset * fps) | |
# It seems seeking via CAP_PROP_POS_MSEC doesn't work | |
stream.set(cv.CAP_PROP_POS_FRAMES, offset) # pylint: disable=no-member | |
def find_frame(stream, template, start_pos=None, last_match=None): | |
"""Given a video stream and a frame, find its offset in seconds. | |
If start_pos is negative, it will be treated as a "seconds from the end" | |
value, analogous to Python list indexing. | |
Does NOT release the stream when finished so it can be used in a manner | |
similar to file.seek(). | |
Returns the offset in seconds or None for no match. | |
""" | |
offset, match = 0, None | |
# Seek to near the end to minimize processing load | |
if start_pos is not None: | |
log.debug("Seeking to %s...", start_pos) | |
seek_stream(stream, start_pos) | |
log.debug("Analyzing...") | |
while stream.isOpened(): | |
success, frame = stream.read() | |
if not success: | |
break | |
offset = stream.get( | |
cv.CAP_PROP_POS_MSEC) / 1000 # pylint: disable=no-member | |
# Use template-matching to find the frame signifying the end of content | |
# pylint: disable=no-member | |
gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) | |
res = cv.matchTemplate(gray, template, cv.TM_CCOEFF_NORMED) | |
threshold = 0.8 | |
loc = np.where(res >= threshold) # pylint: disable=no-member | |
if len(loc[0]): # bool(loc[0]) gives undesired results | |
if match and (offset - match) > 1: | |
log.debug("Returning last frame in first match at %s", match) | |
return match | |
else: | |
match = offset | |
log.debug("Match at %s", match) | |
if match and not last_match: | |
log.debug("Returning first match: %s", match) | |
return match | |
log.debug("Returning last match (last offset: %s): %s", offset, match) | |
return match | |
def analyze_file(video_path, template_path, start_pos=0, last_match=None): | |
"""High-level wrapper to ease analyzing multiple videos in a single run""" | |
if last_match is None: | |
last_match = not os.path.splitext(template_path)[0].endswith('_first') | |
# pylint: disable=no-member | |
template = cv.imread(template_path, 0) | |
stream = cv.VideoCapture(video_path) | |
# TODO: Safety check that template size doesn't exceed video frame size | |
# TODO: Figure out how to deduplicate this | |
try: | |
end = (stream.get(cv.CAP_PROP_FRAME_COUNT) / | |
stream.get(cv.CAP_PROP_FPS)) | |
except ZeroDivisionError: | |
log.info("FPS is zero in %s. Returning None as end.", video_path) | |
end = None | |
try: | |
offset = find_frame(stream, template, start_pos, last_match) | |
finally: | |
stream.release() | |
cv.destroyAllWindows() # TODO: Do I need this anymore? | |
return offset, end | |
def make_edl(video_path, template_path, start_pos=0): | |
"""Highest-level wrapper to make it easy for other scripts to call this""" | |
edl_path = edl_path_for(video_path) | |
if has_edl(video_path, edl_path): | |
return | |
offset, _ = analyze_file(video_path, template_path, start_pos) | |
if offset is not None: | |
# TODO: Skip if (end - offset) < 3sec | |
video_name = os.path.basename(video_path) | |
if video_name.startswith('#'): # '#LoserDonald' is not a comment | |
video_name = './{}'.format(video_name) | |
video_name = video_name.encode('utf8') # %len% is byte-based | |
with open(edl_path, 'w') as edl: | |
edl.write(b"# mpv EDL v0\n%{}%{},0,{:f}".format( | |
len(video_name), video_name, offset)) | |
return bool(offset) | |
def make_with_fallbacks(path, templates, skip_to=-30, silent=False): | |
"""Apply make_edl to a fallback chain of templates""" | |
if len(templates) < 1: | |
log.error("len(templates) < 1") | |
return | |
# TODO: Do this properly | |
for tmpl in templates: | |
try: | |
log.info("Processing %r with %r", path, tmpl) | |
success = make_edl(path, tmpl, skip_to) | |
if success: | |
break | |
else: | |
raise Exception() | |
except Exception: # pylint: disable=broad-except | |
log.info("No match for %s in %s", tmpl, path) | |
else: | |
if not silent: | |
log.error("Failed to make EDL for %r", path) | |
def resolve_path_args(paths, filter_re, default=('.',)): | |
"""Unified code for resolving video and template arguments""" | |
if isinstance(paths, basestring): | |
paths = [paths] | |
results = [] | |
for path in paths or default: | |
if os.path.isdir(path): | |
results.extend(x for x in sorted(os.listdir(path)) | |
if filter_re.match(x)) | |
else: | |
results.append(path) | |
assert isinstance(results, list) | |
return results | |
def main(): | |
"""The main entry point, compatible with setuptools entry points.""" | |
# If we're running on Python 2, take responsibility for preventing | |
# output from causing UnicodeEncodeErrors. (Done here so it should only | |
# happen when not being imported by some other program.) | |
import sys | |
if sys.version_info.major < 3: | |
reload(sys) | |
sys.setdefaultencoding('utf-8') # pylint: disable=no-member | |
from argparse import ArgumentParser, RawTextHelpFormatter | |
parser = ArgumentParser(formatter_class=RawTextHelpFormatter, | |
description=__doc__.replace('\r\n', '\n').split('\n--snip--\n')[0]) | |
parser.add_argument('--version', action='version', | |
version="%%(prog)s v%s" % __version__) | |
parser.add_argument('-v', '--verbose', action="count", | |
default=2, help="Increase the verbosity. Use twice for extra effect") | |
parser.add_argument('-q', '--quiet', action="count", | |
default=0, help="Decrease the verbosity. Use twice for extra effect") | |
parser.add_argument('--cron', action='store_true', | |
default=False, help='Silence potentially routine error messages and ' | |
"set a niceness of 19 so OpenCL doesn't interfere with the desktop") | |
parser.add_argument('-t', '--template', action='append', | |
help="The frame to search for within the video stream. May be given " | |
"multiple times to specify a fallback chain. If a directory path " | |
"is provided, the image files within will be sorted by name " | |
"and added to the fallback chain. (default: the current directory" | |
")") | |
# TODO: Try 30, then back off to 60 if no match. | |
parser.add_argument('--skip-to', default=-60, type=int, | |
help="Seek to this position (in seconds) before processing to reduce " | |
"wasted CPU cycles. Negative values are relative to the end of " | |
"the file. (default: %(default)s)") | |
parser.add_argument('video', nargs='*', help="Path to video file " | |
"(The current working directory will be searched for " | |
"video files if none are provided)") | |
args = parser.parse_args() | |
# Set up clean logging to stderr | |
log_levels = [logging.CRITICAL, logging.ERROR, logging.WARNING, | |
logging.INFO, logging.DEBUG] | |
args.verbose = min(args.verbose - args.quiet, len(log_levels) - 1) | |
args.verbose = max(args.verbose, 0) | |
logging.basicConfig(level=log_levels[args.verbose], | |
format='%(levelname)s: %(message)s') | |
# Minimize CPU scheduler priority if --cron | |
if args.cron: | |
os.nice(19) | |
paths = resolve_path_args(args.video, video_ext_re) | |
templates = resolve_path_args(args.template, frame_ext_re) | |
log.debug("Using templates: %r", templates) | |
# Process with fallback template support | |
for path in paths: | |
if not has_edl(path): | |
make_with_fallbacks(path, templates, args.skip_to, args.cron) | |
if __name__ == '__main__': | |
main() | |
# vim: set sw=4 sts=4 expandtab : |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment