Skip to content

Instantly share code, notes, and snippets.



Last active Mar 11, 2019
What would you like to do?
OpenCV script to match a recurring "boundary between content and post-roll ad" frame and write an EDL to stop playback there
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# pylint: disable=line-too-long
"""Utility for generating MPV EDL files to skip recurring post-roll ads.
1. Use MPV's screenshot hotkey to extract a frame that's consistently present
at the boundary between the content and the ad.
2. Run this script with the screenshot specified via the --template argument.
3. Play the resulting EDL file in MPV.
(MPV EDL files are like playlists which can specify pieces of video files
rather than entire files)
- Gather and maintain statistics on which templates matched in a given folder
so that the order in which templates are tried can learn as a way to optimize
the total runtime.
- Read
- Consider rewriting in Rust to ease shaking the bugs out:
Sources used to develop this:
Stuff I tried but threw out because of a bug outside my control:
Sources used during development:
""" # NOQA
from __future__ import (absolute_import, division, print_function,
with_statement, unicode_literals)
__author__ = "Stephan Sokolow (deitarion/SSokolow)"
__appname__ = "TYT Post-Roll Ad Eliminator"
__version__ = "0.1"
__license__ = "MIT"
VIDEO_GLOBS = ['*.mp4', '*.webm']
FRAME_GLOBS = ['*.png', '*.jpg']
import fnmatch, logging, os, re
import cv2 as cv
import numpy as np
log = logging.getLogger(__name__)
# Borrowed from:
# (Revision: bc784a5fe3c2d4275fef5ec16612bd67142eb0f8)
# TODO: Put this up on PyPI so I don't have to copy it around.
def multiglob_compile(globs, prefix=False, re_flags=0):
"""Generate a single "A or B or C" regex from a list of shell globs.
:param globs: Patterns to be processed by :mod:`fnmatch`.
:type globs: iterable of :class:`~__builtins__.str`
:param prefix: If ``True``, then :meth:`~re.RegexObject.match` will
perform prefix matching rather than exact string matching.
:type prefix: :class:`~__builtins__.bool`
:rtype: :class:`re.RegexObject`
if not globs:
# An empty globs list should only match empty strings
return re.compile('^$')
elif prefix:
globs = [x + '*' for x in globs]
return re.compile('|'.join(fnmatch.translate(x) for x in globs), re_flags)
video_ext_re = multiglob_compile(VIDEO_GLOBS, re.I)
frame_ext_re = multiglob_compile(FRAME_GLOBS, re.I)
def edl_path_for(video_path):
"""Single place where EDL-path lookup is defined"""
return os.path.splitext(video_path)[0] + '.mpv.edl'
def has_edl(video_path, edl_path=None):
"""Unified definition of how to check whether we should skip a file"""
edl_path = edl_path or edl_path_for(video_path)
# TODO: Also handle the "is EDL" case
if os.path.exists(edl_path):"EDL already exists. Skipping %r", video_path)
return True
return False
def seek_stream(stream, offset):
"""If offset is negative, it will be treated as a "seconds from the end"
value, analogous to Python list indexing.
fps = stream.get(cv.CAP_PROP_FPS) # pylint: disable=no-member
if offset > 0:
offset *= fps
elif offset < 0:
frame_count = stream.get(
cv.CAP_PROP_FRAME_COUNT) # pylint: disable=no-member
offset = frame_count + (offset * fps)
# It seems seeking via CAP_PROP_POS_MSEC doesn't work
stream.set(cv.CAP_PROP_POS_FRAMES, offset) # pylint: disable=no-member
def find_frame(stream, template, start_pos=None, last_match=None):
"""Given a video stream and a frame, find its offset in seconds.
If start_pos is negative, it will be treated as a "seconds from the end"
value, analogous to Python list indexing.
Does NOT release the stream when finished so it can be used in a manner
similar to
Returns the offset in seconds or None for no match.
offset, match = 0, None
# Seek to near the end to minimize processing load
if start_pos is not None:
log.debug("Seeking to %s...", start_pos)
seek_stream(stream, start_pos)
while stream.isOpened():
success, frame =
if not success:
offset = stream.get(
cv.CAP_PROP_POS_MSEC) / 1000 # pylint: disable=no-member
# Use template-matching to find the frame signifying the end of content
# pylint: disable=no-member
gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
res = cv.matchTemplate(gray, template, cv.TM_CCOEFF_NORMED)
threshold = 0.8
loc = np.where(res >= threshold) # pylint: disable=no-member
if len(loc[0]): # bool(loc[0]) gives undesired results
if match and (offset - match) > 1:
log.debug("Returning last frame in first match at %s", match)
return match
match = offset
log.debug("Match at %s", match)
if match and not last_match:
log.debug("Returning first match: %s", match)
return match
log.debug("Returning last match (last offset: %s): %s", offset, match)
return match
def analyze_file(video_path, template_path, start_pos=0, last_match=None):
"""High-level wrapper to ease analyzing multiple videos in a single run"""
if last_match is None:
last_match = not os.path.splitext(template_path)[0].endswith('_first')
# pylint: disable=no-member
template = cv.imread(template_path, 0)
stream = cv.VideoCapture(video_path)
# TODO: Safety check that template size doesn't exceed video frame size
# TODO: Figure out how to deduplicate this
end = (stream.get(cv.CAP_PROP_FRAME_COUNT) /
except ZeroDivisionError:"FPS is zero in %s. Returning None as end.", video_path)
end = None
offset = find_frame(stream, template, start_pos, last_match)
cv.destroyAllWindows() # TODO: Do I need this anymore?
return offset, end
def make_edl(video_path, template_path, start_pos=0):
"""Highest-level wrapper to make it easy for other scripts to call this"""
edl_path = edl_path_for(video_path)
if has_edl(video_path, edl_path):
offset, _ = analyze_file(video_path, template_path, start_pos)
if offset is not None:
# TODO: Skip if (end - offset) < 3sec
video_name = os.path.basename(video_path)
if video_name.startswith('#'): # '#LoserDonald' is not a comment
video_name = './{}'.format(video_name)
video_name = video_name.encode('utf8') # %len% is byte-based
with open(edl_path, 'w') as edl:
edl.write(b"# mpv EDL v0\n%{}%{},0,{:f}".format(
len(video_name), video_name, offset))
return bool(offset)
def make_with_fallbacks(path, templates, skip_to=-30, silent=False):
"""Apply make_edl to a fallback chain of templates"""
if len(templates) < 1:
log.error("len(templates) < 1")
# TODO: Do this properly
for tmpl in templates:
try:"Processing %r with %r", path, tmpl)
success = make_edl(path, tmpl, skip_to)
if success:
raise Exception()
except Exception: # pylint: disable=broad-except"No match for %s in %s", tmpl, path)
if not silent:
log.error("Failed to make EDL for %r", path)
def resolve_path_args(paths, filter_re, default=('.',)):
"""Unified code for resolving video and template arguments"""
if isinstance(paths, basestring):
paths = [paths]
results = []
for path in paths or default:
if os.path.isdir(path):
results.extend(x for x in sorted(os.listdir(path))
if filter_re.match(x))
assert isinstance(results, list)
return results
def main():
"""The main entry point, compatible with setuptools entry points."""
# If we're running on Python 2, take responsibility for preventing
# output from causing UnicodeEncodeErrors. (Done here so it should only
# happen when not being imported by some other program.)
import sys
if sys.version_info.major < 3:
sys.setdefaultencoding('utf-8') # pylint: disable=no-member
from argparse import ArgumentParser, RawTextHelpFormatter
parser = ArgumentParser(formatter_class=RawTextHelpFormatter,
description=__doc__.replace('\r\n', '\n').split('\n--snip--\n')[0])
parser.add_argument('--version', action='version',
version="%%(prog)s v%s" % __version__)
parser.add_argument('-v', '--verbose', action="count",
default=2, help="Increase the verbosity. Use twice for extra effect")
parser.add_argument('-q', '--quiet', action="count",
default=0, help="Decrease the verbosity. Use twice for extra effect")
parser.add_argument('--cron', action='store_true',
default=False, help='Silence potentially routine error messages and '
"set a niceness of 19 so OpenCL doesn't interfere with the desktop")
parser.add_argument('-t', '--template', action='append',
help="The frame to search for within the video stream. May be given "
"multiple times to specify a fallback chain. If a directory path "
"is provided, the image files within will be sorted by name "
"and added to the fallback chain. (default: the current directory"
# TODO: Try 30, then back off to 60 if no match.
parser.add_argument('--skip-to', default=-60, type=int,
help="Seek to this position (in seconds) before processing to reduce "
"wasted CPU cycles. Negative values are relative to the end of "
"the file. (default: %(default)s)")
parser.add_argument('video', nargs='*', help="Path to video file "
"(The current working directory will be searched for "
"video files if none are provided)")
args = parser.parse_args()
# Set up clean logging to stderr
log_levels = [logging.CRITICAL, logging.ERROR, logging.WARNING,
logging.INFO, logging.DEBUG]
args.verbose = min(args.verbose - args.quiet, len(log_levels) - 1)
args.verbose = max(args.verbose, 0)
format='%(levelname)s: %(message)s')
# Minimize CPU scheduler priority if --cron
if args.cron:
paths = resolve_path_args(, video_ext_re)
templates = resolve_path_args(args.template, frame_ext_re)
log.debug("Using templates: %r", templates)
# Process with fallback template support
for path in paths:
if not has_edl(path):
make_with_fallbacks(path, templates, args.skip_to, args.cron)
if __name__ == '__main__':
# vim: set sw=4 sts=4 expandtab :
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment