Skip to content

Instantly share code, notes, and snippets.

@ageis
Last active May 1, 2019 06:17
Show Gist options
  • Save ageis/322acd831078058d5938ec3c79a6895a to your computer and use it in GitHub Desktop.
Save ageis/322acd831078058d5938ec3c79a6895a to your computer and use it in GitHub Desktop.
Extract JPEG frames from video while applying optional magnification/cropping/sharpening/contrast enhancement; specify the duration/section or use keyframes/one frame per second, etc.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# coding: utf-8
from __future__ import unicode_literals
from __future__ import print_function
import argparse
import re
import json
import os
import signal
import sys
import numpy as np
import subprocess
from imutils.video import FileVideoStream
from imutils.video import FPS
from pick import pick
import imutils
import time
import cv2
from MediaInfo import MediaInfo
import traceback
if sys.version_info < (3, 0):
import Queue as queue
input = raw_input
else:
import queue
unicode = str
import chardet
import sh
import tqdm
class MediaInfo:
def __init__(self, **kwargs):
self.filename = kwargs.get("filename")
self.cmd = kwargs.get("cmd")
self.info = dict()
if self.filename == None:
self.filename = ""
if self.cmd == None:
for cmdpath in os.environ["PATH"].split(":"):
if os.path.isdir(cmdpath) and "mediainfo" in os.listdir(cmdpath):
self.cmd = cmdpath + "/mediainfo"
elif os.path.isdir(cmdpath) and "ffprobe" in os.listdir(cmdpath):
self.cmd = cmdpath + "/ffprobe"
if self.cmd == None:
self.cmd = ""
def getInfo(self):
if not os.path.exists(self.filename) or not os.path.exists(self.cmd):
return None
cmdName = os.path.basename(self.cmd)
if cmdName == "ffprobe":
self._ffprobeGetInfo()
elif cmdName == "mediainfo":
self._mediainfoGetInfo()
return self.info
def _ffprobeGetInfo(self):
cmd = (
self.cmd
#+ " -v 0 -loglevel quiet -print_format json -show_entries -show_error -count_frames -i "
+ " -v error -loglevel quiet -select_streams v:0 -show_format -show_streams -show_error -print_format json -i "
+ self.filename
)
outputBytes = ""
try:
outputBytes = subprocess.check_output(cmd, shell=True)
except subprocess.CalledProcessError as e:
return ""
outputText = outputBytes.decode("utf-8")
self.info = self._ffprobeGetInfoJson(outputText)
def _ffprobeGetInfoJson(self, sourceString):
mediaInfo = dict()
infoDict = dict()
try:
infoDict = json.loads(sourceString)
except json.JSONDecodeError as err:
return mediaInfo
mediaInfo["duration"] = infoDict.get("format").get("duration")
videoStreamIndex = None
for item in infoDict.get("streams"):
codec_type = item.get("codec_type")
if codec_type == "video":
videoStreamIndex = item.get("index")
mediaInfo["haveVideo"] = True
if mediaInfo.get("haveVideo"):
mediaInfo["videoDuration"] = infoDict.get("streams")[videoStreamIndex].get("duration")
mediaInfo["videoWidth"] = infoDict.get("streams")[videoStreamIndex].get("width")
mediaInfo["videoHeight"] = infoDict.get("streams")[videoStreamIndex].get("height")
mediaInfo["videoFrameRate"] = infoDict.get("streams")[videoStreamIndex].get(
"r_frame_rate"
)
mediaInfo["videoFrameCount"] = infoDict.get("streams")[videoStreamIndex].get(
"nb_read_frames"
)
return mediaInfo
def _mediainfoGetInfo(self):
prevPath = os.getcwd()
newPath = os.path.abspath(os.path.dirname(self.filename))
file = os.path.basename(self.filename)
cmd = self.cmd + " -f " + file
outputBytes = ""
try:
os.chdir(newPath)
try:
outputBytes = subprocess.check_output(cmd, shell=True)
except subprocess.CalledProcessError as e:
return ""
outputText = outputBytes.decode("utf-8")
except IOError:
os.chdir(prevPath)
return ""
finally:
os.chdir(prevPath)
self.info = self._mediainfoGetInfoRegex(outputText)
def _mediainfoGetInfoRegex(self, sourceString):
mediaInfo = dict()
general = re.search("(^General\n.*?\n\n)", sourceString, re.S)
if general:
generalInfo = general.group(0)
container = re.search("Format\s*:\s*([\w\_\-\\\/\. ]+)\n", generalInfo, re.S)
fileSize = re.search("File size\s*:\s*(\d+)\.?\d*\n", generalInfo, re.S)
duration = re.search("Duration\s*:\s*(\d+)\.?\d*\n", generalInfo, re.S)
mediaInfo["container"] = container.group(1)
mediaInfo["fileSize"] = fileSize.group(1)
mediaInfo["duration"] = (str)((float)(duration.group(1)) / 1000)
video = re.search("(\nVideo[\s\#\d]*\n.*?\n\n)", sourceString, re.S)
if video:
mediaInfo["haveVideo"] = 1
videoInfo = video.group(0)
videoDuration = re.search("Duration\s*:\s*(\d+)\.?\d*\n", videoInfo, re.S)
videoWidth = re.search("Width\s*:\s*(\d+)\n", videoInfo, re.S)
videoHeight = re.search("Height\s*:\s*(\d+)\n", videoInfo, re.S)
videoFrameRate = re.search("Frame rate\s*:\s*([\d\.]+)\n", videoInfo, re.S)
videoFrameCount = re.search("Frame count\s*:\s*(\d+)\.?\d*\n", videoInfo, re.S)
if videoDuration:
mediaInfo["videoDuration"] = (str)((float)(videoDuration.group(1)) / 1000)
if videoWidth:
mediaInfo["videoWidth"] = (int)(videoWidth.group(1))
if videoHeight:
mediaInfo["videoHeight"] = (int)(videoHeight.group(1))
if videoFrameRate:
mediaInfo["videoFrameRate"] = videoFrameRate.group(1)
if videoFrameCount:
mediaInfo["videoFrameCount"] = videoFrameCount.group(1)
return mediaInfo
class ProgressNotifier(object):
_DURATION_RX = re.compile("Duration: (\d{2}):(\d{2}):(\d{2})\.\d{2}")
_PROGRESS_RX = re.compile("time=(\d{2}):(\d{2}):(\d{2})\.\d{2}")
_SOURCE_RX = re.compile("from '(.*)':")
_FPS_RX = re.compile("(\d{2}\.\d{2}|\d{2}) fps")
@staticmethod
def _seconds(hours, minutes, seconds):
return (int(hours) * 60 + int(minutes)) * 60 + int(seconds)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.pbar is not None:
self.pbar.close()
def __init__(self, file=None):
self.lines = []
self.line_acc = []
self.duration = None
self.source = None
self.started = False
self.pbar = None
self.fps = None
self.file = file or sys.stderr
def __call__(self, char, stdin):
if not isinstance(char, unicode):
encoding = chardet.detect(char)["encoding"]
char = unicode(char, encoding)
if char in "\r\n":
line = self.newline()
if self.duration is None:
self.duration = self.get_duration(line)
if self.source is None:
self.source = self.get_source(line)
if self.fps is None:
self.fps = self.get_fps(line)
self.progress(line)
else:
self.line_acc.append(char)
if self.line_acc[-6:] == list("[y/N] "):
print("".join(self.line_acc), end="")
stdin.put(input() + "\n")
self.newline()
def newline(self):
line = "".join(self.line_acc)
self.lines.append(line)
self.line_acc = []
return line
def get_fps(self, line):
search = self._FPS_RX.search(line)
if search is not None:
return round(float(search.group(1)))
def get_duration(self, line):
search = self._DURATION_RX.search(line)
if search is not None:
return self._seconds(*search.groups())
return None
def get_source(self, line):
search = self._SOURCE_RX.search(line)
if search is not None:
return os.path.basename(search.group(1))
return None
def progress(self, line):
search = self._PROGRESS_RX.search(line)
if search is not None:
total = self.duration
current = self._seconds(*search.groups())
unit = " seconds"
if self.fps is not None:
unit = " frames"
current *= self.fps
total *= self.fps
if self.pbar is None:
self.pbar = tqdm.tqdm(
desc=self.source,
file=self.file,
total=total,
dynamic_ncols=True,
unit=unit,
ncols=0,
)
self.pbar.update(current - self.pbar.n)
def calculate_fps(filename):
if not os.path.exists(filename):
sys.stderr.write("ERROR: filename %r was not found!" % (filename))
sys.exit(1)
fps_results = []
fvs = FileVideoStream(filename).start()
timer = FPS().start()
timeout = time.time() + 1
while fvs.more():
fvs.read()
timer.update()
if time.time() >= timeout:
timer.stop()
fvs.stop()
fps_results.append({'OpenCV file video stream': int(np.round(timer.fps()))})
break
vid = cv2.VideoCapture(filename)
cv2.namedWindow = os.path.basename(sys.argv[0])
if not vid.isOpened():
sys.stderr.write("ERROR: Cannot read video file %r" % (filename))
sys.exit(1)
opencv_fps = vid.get(cv2.CAP_PROP_FPS)
fps_results.append({'OpenCV video capture properties': int(opencv_fps)})
# tracker = cv2.TrackerBoosting_create()
# bbox = cv2.selectROI(frame, False)
# tracker.init(frame, bbox)
timer = FPS().start()
timeout = time.time() + 1
while vid.isOpened():
frame = vid.read()
# ticks = cv2.getTickCount()
timer.update()
# frames = cv2.getTickFrequency() / (cv2.getTickCount() - ticks)
cv2.waitKey(1)
if time.time() >= timeout:
timer.stop()
vid.release()
cv2.destroyAllWindows()
break
fps_results.append({'OpenCV video capture approximation': int(np.round(timer.fps()))})
media_info = query_mediainfo(filename=filename, method='/usr/bin/mediainfo')
ffprobe_info = query_mediainfo(filename=filename, method='/usr/bin/ffprobe')
#fps_results.append({'MediaInfo': str(int(media_info['videoFrameRate'].split('.')[0]))})
#fps_results.append({'ffprobe': str(int(ffprobe_info['videoFrameRate'].split('/')[0]))})
fps_rates = []
for measurement in fps_results:
for num in measurement.values():
fps_rates.append(int(num))
custom_string = 'Select this option to input custom FPS value.'
auto_string = 'Select this option to automatically use the average of all framerates.'
fps_results.append(custom_string)
fps_results.append(auto_string)
title = 'Please choose the correct frames per second (FPS) from these sources:'
selected = pick(fps_results, title, multi_select=False, min_selection_count=1)
print(selected[1])
#sys.exit(0)
if selected[1] == 5:
fps_rate = input('Frames per second: ')
return fps_rate
elif selected[1] == 6:
avg_fps = int(np.round(float(sum(fps_rates))/len(fps_rates)))
else:
return selected[0]
def recurse_keys(df, indent = ' '):
for key in df.keys():
print(indent+str(key))
if isinstance(df[key], dict):
recurse_keys(df[key], indent+' ')
def get_frame_rate(filename):
if not os.path.exists(filename):
sys.stderr.write("ERROR: filename %r was not found!" % (filename,))
sys.exit(1)
return -1
out = subprocess.check_output(['ffprobe', filename , '-v', 'error' , '-select_streams','v:0','-print_format','flat','-show_entries','stream=avg_frame_rate','-of', 'default=noprint_wrappers=1:nokey=1'], encoding='utf8')
rate = str(out)
num = rate.split('/')[0]
return np.round(int(num))
def get_dimensions(filename):
if not os.path.exists(filename):
sys.stderr.write("ERROR: filename %r was not found!" % (filename,))
sys.exit(1)
return -1
vid = cv2.VideoCapture(filename)
return (int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT)), int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)))
def query_mediainfo(**kwargs):
fn = kwargs.get('filename')
method = kwargs.get('method')
try:
info = MediaInfo(filename=fn,cmd=method)
except:
traceback.print_exc(file=sys.stdout)
sys.exit(1)
return info.getInfo()
def test_dimension_string(input):
DIMENSION_REGEX = re.compile("(\d+):(\d+):(\d+):(\d+)")
search = DIMENSION_REGEX.search(input)
if search is not None:
return True
else:
return False
def is_duration_string(input):
DURATION_REGEX_1 = re.compile("(\d{2}):(\d{2}):(\d{2})\.\d{2}")
DURATION_REGEX_2 = re.compile("(\d{2}):(\d{2}):(\d{2})")
search_1 = DURATION_REGEX_1.search(input)
search_2 = DURATION_REGEX_2.search(input)
if search_1 is not None:
try:
time.strptime(input, '%H:%M:%S.%f')
except ValueError:
return False
return True
elif search_2 is not None:
try:
time.strptime(input, '%H:%M:%S')
except ValueError:
return False
return True
else:
return False
def main(argv=None, stream=sys.stderr):
argv = argv or sys.argv[1:]
parser = argparse.ArgumentParser(description='Extract JPEG frames from input video')
parser.add_argument('--filename', required=True, dest='filename', type=str, help='path to video file')
parser.add_argument('--destination', required=False, dest='destination', default=os.getcwd(), type=str, help='directory to output images to')
parser.add_argument('--start', required=False, dest='start', default=None, type=str, help='time to start at')
parser.add_argument('--duration', required=False, dest='duration', default=None, type=str, help='total duration to cover, beginning from specified start time')
parser.add_argument('--sharpen', required=False, dest='sharpen', default=True, action='store_true', help='whether to sharpen video slightly')
parser.add_argument('--contrast', required=False, dest='contrast', default=False, action='store_true', help='whether to add contrast')
parser.add_argument('--crop', required=False, dest='crop', default=None, type=str, help='dimension to crop e.g. 800:600:0:0')
parser.add_argument('--zoom', required=False, dest='zoom', default=False, action='store_true', help='double the scale')
parser.add_argument('--keyframes', required=False, dest='keyframes', default=False, action='store_true', help='whether to grab only key frames')
parser.add_argument('--verbose', required=False, dest='verbose', default=False, action='store_true', help='enable verbose/debug output from ffmpeg')
parser.add_argument('--dimensions', required=False, dest='dimensions', default=False, action='store_true', help='determine and output width and height of video file and then exit—for determining crop amount')
parser.add_argument('--fps', required=False, dest='fps', default=None, type=int, help='specify custom FPS (sometimes our calculation is incorrect)')
parser.add_argument('--calculate', required=False, dest='calculate', default=False, action='store_true', help='query OpenCV, mediainfo and/or ffprobe for FPS rate')
parser.add_argument('--onepersec', required=False, dest='onepersec', default=False, action='store_true', help='extract only one frame per second')
args = parser.parse_args()
if args.dimensions and args.filename:
width, height = get_dimensions(args.filename)
sys.stdout.write("%r x %r" % (width, height))
sys.exit(0)
if args.calculate and args.filename:
calculated_fps = calculate_fps(args.filename)
args.fps = str(int(calculated_fps))
if args.destination == os.getcwd():
print('Using current working directory as default destination: ' + os.getcwd())
ffmpeg_opts = list()
filters = list()
if args.verbose:
ffmpeg_opts.append('-loglevel')
ffmpeg_opts.append('debug')
ffmpeg_opts.append('-v')
ffmpeg_opts.append('verbose')
if args.start or args.duration:
ffmpeg_opts.append('-accurate_seek')
if args.start is not None:
if not is_duration_string(args.start):
sys.stderr.write("ERROR: %r is not a correctly formatted time duration!" % (args.start))
sys.exit(1)
ffmpeg_opts.append('-ss')
ffmpeg_opts.append(args.start)
if args.duration is not None:
if not is_duration_string(args.duration):
sys.stderr.write("ERROR: %r is not a correctly formatted time duration!" % (args.duration))
sys.exit(1)
ffmpeg_opts.append('-t')
ffmpeg_opts.append(args.duration)
if not args.fps:
fps = str(get_frame_rate(args.filename))
else:
fps = str(args.fps)
ffmpeg_opts.append('-i')
ffmpeg_opts.append(args.filename)
if args.onepersec:
ffmpeg_opts.append('-r')
ffmpeg_opts.append('1')
ffmpeg_opts.append('-framerate')
ffmpeg_opts.append(fps)
else:
filters.append('fps=' + fps)
if args.keyframes:
filters.append('select=eq(pict_type\,I)')
ffmpeg_opts.append('-vsync')
ffmpeg_opts.append('vfr')
if args.crop:
if not test_dimension_string(args.crop):
sys.stderr.write("ERROR: %r is not a correctly formatted cropping string!" % (args.crop))
sys.exit(1)
filters.append('crop=' + args.crop)
#filters.append('unsharp=5:5:1.5:5:5:0.0')
#filters.append('unsharp=5:7:2.0:7:7:0.0')
#filters.append('smartblur=lr=2.00:ls=-0.90:lt=-5.0:cr=0.5:cs=1.0:ct=1.5')
#filters.append('convolution="0 -1 0 -1 5 -1 0 -1 0:0 -1 0 -1 5 -1 0 -1 0:0 -1 0 -1 5 -1 0 -1 0:0 -1 0 -1 5 -1 0 -1 0"')
#filters.append('convolution="0 0 0 -1 1 0 0 0 0:0 0 0 -1 1 0 0 0 0:0 0 0 -1 1 0 0 0 0:0 0 0 -1 1 0 0 0 0:5:1:1:1:0:128:128:128"')
if args.zoom:
filters.append('scale=iw*2:ih*2')
if args.sharpen:
filters.append('unsharp=5:5:1.0:5:5:0.0')
if args.contrast:
filters.append('eq=contrast=1.0:gamma=0.95:brightness=0:saturation=1.0')
filter_opts = ",".join(filters)
ffmpeg_opts.append('-filter:v')
ffmpeg_opts.append(filter_opts)
if not os.path.exists(args.destination):
os.makedirs(args.destination)
dest = args.destination + '/' + os.path.splitext(os.path.basename(args.filename))[0] + '%d.jpg'
print(ffmpeg_opts)
if {"-h", "-help", "--help"}.intersection(argv):
sh.ffmpeg(help=True, _fg=True)
return 0
try:
with ProgressNotifier(file=stream) as notifier:
sh.ffmpeg(
*ffmpeg_opts,
'-q:v',
'1',
dest,
_in=queue.Queue(),
_err=notifier,
_out_bufsize=0,
_err_bufsize=0,
_in_bufsize=0,
#_long_sep=' ',
_no_out=False,
_no_pipe=True,
_tty_in=True
)
except sh.ErrorReturnCode as err:
print(err)
print(err.stderr)
print(notifier.lines[-1], file=stream)
return err.exit_code
except KeyboardInterrupt:
print("Exiting.", file=stream)
return signal.SIGINT + 128 # POSIX standard
except Exception as e:
print(e)
else:
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment