Skip to content

Instantly share code, notes, and snippets.

@JustinShenk
Last active October 18, 2022 21:01
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JustinShenk/f4ba14a271202700f3251e73f6150696 to your computer and use it in GitHub Desktop.
Save JustinShenk/f4ba14a271202700f3251e73f6150696 to your computer and use it in GitHub Desktop.
Compare Videos with Lukas Kanade Optical Flow Parameters
import subprocess
videofile = 'crowd_trimmed.mp4'
cmds = [
'--maxCorners=10', '--maxCorners=50', '--maxCorners=100',
'--qualityLevel=0.1', '--qualityLevel=0.8', '--minDistance=2',
'--minDistance=40', '--winSize=5', '--winSize=100', '--blockSize=2',
'--blockSize=20', '--criteria_params1=5', '--criteria_params1=20',
'--criteria_params2=0.01', '--criteria_params2=0.08',
'--criteria_params2=0.2', '--criteria_params1=40', '--winSize=400',
'--minDistance=100', '--qualityLevel=0.1', '--qualityLevel=0.95',
'--blockSize=100', '--blockSize=1', '--maxLevel=0', '--maxLevel=4',
'--winSize=7', '--blockSize=3', '--winSize=3', '--maxCorners=5',
'--maxCorners=500'
]
for cmd in cmds:
print(subprocess.check_output(['python', 'lk_explore.py', videofile, cmd]))
#!/usr/bin/env python
'''
Lucas-Kanade parameter exploration
==================================
Based on code from https://github.com/opencv/opencv/blob/master/samples/python/lk_track.py
Easy comparison of parameters for Lucas-Kanade sparse optical flow on a video or
webcam. Uses goodFeaturesToTrack for track initialization and back-tracking for
match verification between frames.
Usage
-----
lk_explore.py video_src [parameters]
Keys
----
ESC - exit
'''
# Python 2/3 compatibility
from __future__ import print_function
import argparse
import os
import numpy as np
import cv2 as cv
import video
from time import clock
def nothing(x):
pass
def draw_str(dst, target, s):
x, y = target
cv.putText(
dst,
s, (x + 1, y + 1),
cv.FONT_HERSHEY_PLAIN,
1.0, (0, 0, 0),
thickness=2,
lineType=cv.LINE_AA)
cv.putText(
dst,
s, (x, y),
cv.FONT_HERSHEY_PLAIN,
1.0, (255, 255, 255),
lineType=cv.LINE_AA)
feature_params = dict(
maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
lk_params = dict(
winSize=(15, 15),
maxLevel=2,
criteria=(cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT, 10, 0.03))
class App:
def __init__(self, args, user_args, save_tracks=False):
self.track_len = 10
self.detect_interval = 5
self.tracks = []
if not os.path.exists(args.video_src):
raise ('{} not found'.format(args.video_src))
self.cap = video.create_capture(args.video_src)
cv.namedWindow('lk_track')
self.lk_params = lk_params
self.feature_params = feature_params
for k, v in vars(args).items():
if k in self.feature_params:
self.feature_params[k] = v
elif k in self.lk_params:
self.lk_params[k] = v
self.frame_idx = 0
self.init_writer(args)
self.user_args = user_args
self.save_tracks = save_tracks
def init_writer(self, args):
args_str = ''
filename_args = ''
cnt = 0
args_vars = vars(args)
def no_space(x):
if isinstance(x, str):
x = ''.join(x.split())
elif isinstance(x, tuple):
x = str(x)
x = ''.join(x.split())
return x
for k in sorted(args_vars.keys()):
if 'criteria_params' in k:
continue
if 'video_src' in k:
continue
args_str += '--{}={}'.format(k, args_vars[k])
leading_underscore = '_' if cnt else ''
filename_args += '{}{}-{}'.format(leading_underscore, k,
no_space(args_vars[k]))
cnt += 1
try:
video_dir = os.path.splitext(args.video_src)[0]
except:
video_dir = 'webcam'
self.video_dir = video_dir
if not os.path.exists(video_dir):
os.mkdir(video_dir)
filename = os.path.join(video_dir, filename_args + '.mp4')
w = self.cap.get(cv.CAP_PROP_FRAME_WIDTH)
h = self.cap.get(cv.CAP_PROP_FRAME_HEIGHT)
fps = self.cap.get(cv.CAP_PROP_FPS)
fourcc = cv.VideoWriter_fourcc(*'mp4v')
self.out = cv.VideoWriter(filename, fourcc, fps, (int(w), int(h)))
def save_data(data):
import csv
filename = 'data_' + self.user_args.replace('--', '').replace(
'[', '').replace(']', '').replace('\'', '')
with open(filename, 'w') as csvfile:
fieldnames = ['frame', 'tracks']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in data:
writer.writerow({
'frame': row['frame'],
'tracks': row['tracks']
})
print("saved to {}".format(filename))
def run(self):
num_frames = int(self.cap.get(cv.CAP_PROP_FRAME_COUNT))
print(num_frames)
self.cap.set(cv.CAP_PROP_FRAME_COUNT, num_frames)
data = []
for idx in range(num_frames):
# set frame index
self.cap.set(cv.CAP_PROP_POS_FRAMES, idx)
_ret, frame = self.cap.read()
if not _ret:
print('Failed to read frame {}'.format(idx))
continue
frame_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
vis = frame.copy()
if len(self.tracks) > 0:
img0, img1 = self.prev_gray, frame_gray
p0 = np.float32([tr[-1] for tr in self.tracks]).reshape(
-1, 1, 2)
p1, _st, _err = cv.calcOpticalFlowPyrLK(
img0, img1, p0, None, **self.lk_params)
p0r, _st, _err = cv.calcOpticalFlowPyrLK(
img1, img0, p1, None, **self.lk_params)
d = abs(p0 - p0r).reshape(-1, 2).max(-1)
good = d < 1
new_tracks = []
for tr, (x, y), good_flag in zip(self.tracks, p1.reshape(
-1, 2), good):
if not good_flag:
continue
tr.append((x, y))
if len(tr) > self.track_len:
del tr[0]
new_tracks.append(tr)
cv.circle(vis, (x, y), 2, (0, 255, 0), -1)
self.tracks = new_tracks
cv.polylines(vis, [np.int32(tr) for tr in self.tracks], False,
(0, 255, 0))
draw_str(vis, (20, 20), 'track count: %d' % len(self.tracks))
row = {
'frame': idx,
'tracks': [np.int32(tr) for tr in self.tracks]
}
data.append(row)
if self.frame_idx % self.detect_interval == 0:
mask = np.zeros_like(frame_gray)
mask[:] = 255
for x, y in [np.int32(tr[-1]) for tr in self.tracks]:
cv.circle(mask, (x, y), 5, 0, -1)
p = cv.goodFeaturesToTrack(
frame_gray, mask=mask, **self.feature_params)
if p is not None:
for x, y in np.float32(p).reshape(-1, 2):
self.tracks.append([(x, y)])
self.frame_idx += 1
self.prev_gray = frame_gray
cv.imshow('lk_track', vis)
self.out.write(vis)
self.cap.release()
self.out.release()
if self.save_tracks:
save_data(data)
def main():
import sys
import argparse
parser = argparse.ArgumentParser(
description='Lukas Kanade Optical Flow parameter explainer.')
parser.add_argument('video_src', type=str, default=0)
parser.add_argument('--maxCorners', type=int, default=100)
parser.add_argument('--maxLevel', type=int, default=2)
parser.add_argument('--qualityLevel', type=float, default=0.3)
parser.add_argument('--minDistance', type=int, default=7)
parser.add_argument('--blockSize', type=int, default=7)
parser.add_argument('--winSize', type=int, default=15)
parser.add_argument('--criteria_params1', type=int, default=10)
parser.add_argument('--criteria_params2', type=float, default=0.03)
args = parser.parse_args()
criteria = (cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT,
args.criteria_params1, args.criteria_params2)
arg_vars = vars(args)
arg_vars['criteria'] = criteria
if args.video_src is '0':
arg_vars[video_src] = 0
arg_vars['winSize'] = (args.winSize, args.winSize)
print(__doc__)
App(args, user_args=sys.argv[2:]).run()
cv.destroyAllWindows()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment