Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
OpenCV script to match a recurring "boundary between content and post-roll ad" frame and write an EDL to stop playback there
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# pylint: disable=line-too-long
"""Utility for generating MPV EDL files to skip recurring post-roll ads.
Instructions:
1. Use MPV's screenshot hotkey to extract a frame that's consistently present
at the boundary between the content and the ad.
2. Run this script with the screenshot specified via the --template argument.
3. Play the resulting EDL file in MPV.
(MPV EDL files are like playlists which can specify pieces of video files
rather than entire files)
--snip--
Sources used to develop this:
- https://opencv-python-tutroals.readthedocs.io/en/latest/py_tutorials/py_gui/py_video_display/py_video_display.html#playing-video-from-file
- https://pythonprogramming.net/template-matching-python-opencv-tutorial/
- http://stackoverflow.com/q/9629071
- http://stackoverflow.com/a/10979030
- http://stackoverflow.com/a/2981202
- https://github.com/mpv-player/mpv/blob/master/DOCS/edl-mpv.rst
- http://smplayer.sourceforge.net/en/mpv
Stuff I tried but threw out because of a bug outside my control:
- http://www.mplayerhq.hu/DOCS/HTML/en/edl.html
- http://forum.smplayer.info/viewtopic.php?p=15706#p15706
Sources used during development:
- http://www.pyimagesearch.com/2016/03/07/transparent-overlays-with-opencv/
""" # NOQA
from __future__ import (absolute_import, division, print_function,
with_statement, unicode_literals)
__author__ = "Stephan Sokolow (deitarion/SSokolow)"
__appname__ = "TYT Post-Roll Ad Eliminator"
__version__ = "0.1"
__license__ = "MIT"
VIDEO_GLOBS = ['*.mp4', '*.webm']
FRAME_GLOBS = ['*.png', '*.jpg']
import fnmatch, logging, os, re
import cv2
import cv2.cv as cv # pylint: disable=no-name-in-module,import-error
import numpy as np
log = logging.getLogger(__name__)
# Borrowed from:
# https://github.com/ssokolow/game_launcher/blob/master/src/util/common.py
# (Revision: bc784a5fe3c2d4275fef5ec16612bd67142eb0f8)
# TODO: Put this up on PyPI so I don't have to copy it around.
def multiglob_compile(globs, prefix=False, re_flags=0):
"""Generate a single "A or B or C" regex from a list of shell globs.
:param globs: Patterns to be processed by :mod:`fnmatch`.
:type globs: iterable of :class:`~__builtins__.str`
:param prefix: If ``True``, then :meth:`~re.RegexObject.match` will
perform prefix matching rather than exact string matching.
:type prefix: :class:`~__builtins__.bool`
:rtype: :class:`re.RegexObject`
"""
if not globs:
# An empty globs list should only match empty strings
return re.compile('^$')
elif prefix:
globs = [x + '*' for x in globs]
return re.compile('|'.join(fnmatch.translate(x) for x in globs), re_flags)
video_ext_re = multiglob_compile(VIDEO_GLOBS, re.I)
frame_ext_re = multiglob_compile(FRAME_GLOBS, re.I)
def seek_stream(stream, offset):
"""If offset is negative, it will be treated as a "seconds from the end"
value, analogous to Python list indexing.
"""
fps = stream.get(cv.CV_CAP_PROP_FPS)
if offset > 0:
offset *= fps
elif offset < 0:
frame_count = stream.get(cv.CV_CAP_PROP_FRAME_COUNT)
offset = frame_count + (offset * fps)
# It seems seeking via CV_CAP_PROP_POS_MSEC doesn't work
stream.set(cv.CV_CAP_PROP_POS_FRAMES, offset)
def find_frame(stream, template, start_pos=None, last_match=False):
"""Given a video stream and a frame, find its offset in seconds.
If start_pos is negative, it will be treated as a "seconds from the end"
value, analogous to Python list indexing.
Does NOT release the stream when finished so it can be used in a manner
similar to file.seek().
Returns the offset in seconds or None for no match.
"""
offset, match = 0, None
# Seek to near the end to minimize processing load
if start_pos is not None:
log.debug("Seeking to %s..." % start_pos)
seek_stream(stream, start_pos)
log.debug("Analyzing...")
while stream.isOpened():
success, frame = stream.read()
if not success:
break
offset = stream.get(cv.CV_CAP_PROP_POS_MSEC) / 1000
# Use template-matching to find the frame signifying the end of content
# pylint: disable=no-member
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
res = cv2.matchTemplate(gray, template, cv2.TM_CCOEFF_NORMED)
threshold = 0.8
loc = np.where(res >= threshold) # pylint: disable=no-member
if len(loc[0]): # bool(loc[0]) gives undesired results
match = offset
log.debug("Match at %s", match)
if match and not last_match:
log.debug("Returning first match: %s", match)
return match
log.debug("Returning last match (last offset: %s): %s", offset, match)
return match
def analyze_file(video_path, template_path, start_pos=0, last_match=False):
"""High-level wrapper to ease analyzing multiple videos in a single run"""
# pylint: disable=no-member
template = cv2.imread(template_path, 0)
stream = cv2.VideoCapture(video_path)
# TODO: Figure out how to deduplicate this
try:
end = (stream.get(cv.CV_CAP_PROP_FRAME_COUNT) /
stream.get(cv.CV_CAP_PROP_FPS))
except ZeroDivisionError:
log.info("FPS is zero in %s. Returning None as end.", video_path)
end = None
try:
offset = find_frame(stream, template, start_pos, last_match)
finally:
stream.release()
cv2.destroyAllWindows() # TODO: Do I need this anymore?
return offset, end
def make_edl(video_path, template_path, start_pos=0):
"""Highest-level wrapper to make it easy for other scripts to call this"""
edl_path = os.path.splitext(video_path)[0] + '.mpv.edl'
if os.path.exists(edl_path):
log.info("EDL already exists. Skipping %r", video_path)
return
offset, _ = analyze_file(video_path, template_path, start_pos, True)
if offset is not None:
# TODO: Skip if (end - offset) < 3sec
video_name = os.path.basename(video_path)
if video_name.startswith('#'): # '#LoserDonald' is not a comment
video_name = './{}'.format(video_name)
video_name = video_name.encode('utf8') # %len% is byte-based
with open(edl_path, 'w') as edl:
edl.write(b"# mpv EDL v0\n%{}%{},0,{:f}".format(
len(video_name), video_name, offset))
return bool(offset)
def main():
"""The main entry point, compatible with setuptools entry points."""
# If we're running on Python 2, take responsibility for preventing
# output from causing UnicodeEncodeErrors. (Done here so it should only
# happen when not being imported by some other program.)
import sys
if sys.version_info.major < 3:
reload(sys)
sys.setdefaultencoding('utf-8') # pylint: disable=no-member
from argparse import ArgumentParser, RawTextHelpFormatter
parser = ArgumentParser(formatter_class=RawTextHelpFormatter,
description=__doc__.replace('\r\n', '\n').split('\n--snip--\n')[0])
parser.add_argument('--version', action='version',
version="%%(prog)s v%s" % __version__)
parser.add_argument('-v', '--verbose', action="count",
default=2, help="Increase the verbosity. Use twice for extra effect")
parser.add_argument('-q', '--quiet', action="count",
default=0, help="Decrease the verbosity. Use twice for extra effect")
parser.add_argument('--cron', action='store_true',
default=False, help='Silence potentially routine error messages and '
"set a niceness of 19 so OpenCL doesn't interfere with the desktop")
parser.add_argument('-t', '--template', action='append',
help="The frame to search for within the video stream. May be given "
"multiple times to specify a fallback chain. If a directory path "
"is provided, the image files within will be sorted by name "
"and added to the fallback chain. (default: the current directory"
")")
parser.add_argument('--skip-to', default=-30, type=int,
help="Seek to this position (in seconds) before processing to reduce "
"wasted CPU cycles. Negative values are relative to the end of "
"the file. (default: %(default)s)")
parser.add_argument('video', nargs='*', help="Path to video file "
"(The current working directory will be searched for "
"video files if none are provided)")
args = parser.parse_args()
# Set up clean logging to stderr
log_levels = [logging.CRITICAL, logging.ERROR, logging.WARNING,
logging.INFO, logging.DEBUG]
args.verbose = min(args.verbose - args.quiet, len(log_levels) - 1)
args.verbose = max(args.verbose, 0)
logging.basicConfig(level=log_levels[args.verbose],
format='%(levelname)s: %(message)s')
# Minimize CPU scheduler priority if --cron
if args.cron:
os.nice(19)
# Glob for videos if no positional args were given
paths = args.video
if not paths:
paths = [x for x in os.listdir('.') if video_ext_re.match(x)]
# Resolve directories in the template list and default to ['.']
templates = []
for tmpl in args.template or ['.']:
if os.path.isdir(tmpl):
templates.extend(x for x in sorted(os.listdir(tmpl))
if frame_ext_re.match(x))
else:
templates.append(tmpl)
log.debug("Using templates: %r", templates)
# Process with fallback template support
for path in paths:
for tmpl in templates:
try:
log.info("Processing %r with %r", path, tmpl)
success = make_edl(path, tmpl, args.skip_to)
if success:
break
else:
raise Exception()
except Exception: # pylint: disable=broad-except
log.info("No match for %s in %s", tmpl, path)
else:
if not args.cron:
log.error("Failed to make EDL for %r", path)
if __name__ == '__main__':
main()
# vim: set sw=4 sts=4 expandtab :
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment