Last active
October 18, 2022 21:01
-
-
Save JustinShenk/f4ba14a271202700f3251e73f6150696 to your computer and use it in GitHub Desktop.
Compare Videos with Lukas Kanade Optical Flow Parameters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
videofile = 'crowd_trimmed.mp4' | |
cmds = [ | |
'--maxCorners=10', '--maxCorners=50', '--maxCorners=100', | |
'--qualityLevel=0.1', '--qualityLevel=0.8', '--minDistance=2', | |
'--minDistance=40', '--winSize=5', '--winSize=100', '--blockSize=2', | |
'--blockSize=20', '--criteria_params1=5', '--criteria_params1=20', | |
'--criteria_params2=0.01', '--criteria_params2=0.08', | |
'--criteria_params2=0.2', '--criteria_params1=40', '--winSize=400', | |
'--minDistance=100', '--qualityLevel=0.1', '--qualityLevel=0.95', | |
'--blockSize=100', '--blockSize=1', '--maxLevel=0', '--maxLevel=4', | |
'--winSize=7', '--blockSize=3', '--winSize=3', '--maxCorners=5', | |
'--maxCorners=500' | |
] | |
for cmd in cmds: | |
print(subprocess.check_output(['python', 'lk_explore.py', videofile, cmd])) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
Lucas-Kanade parameter exploration | |
================================== | |
Based on code from https://github.com/opencv/opencv/blob/master/samples/python/lk_track.py | |
Easy comparison of parameters for Lucas-Kanade sparse optical flow on a video or | |
webcam. Uses goodFeaturesToTrack for track initialization and back-tracking for | |
match verification between frames. | |
Usage | |
----- | |
lk_explore.py video_src [parameters] | |
Keys | |
---- | |
ESC - exit | |
''' | |
# Python 2/3 compatibility | |
from __future__ import print_function | |
import argparse | |
import os | |
import numpy as np | |
import cv2 as cv | |
import video | |
from time import clock | |
def nothing(x): | |
pass | |
def draw_str(dst, target, s): | |
x, y = target | |
cv.putText( | |
dst, | |
s, (x + 1, y + 1), | |
cv.FONT_HERSHEY_PLAIN, | |
1.0, (0, 0, 0), | |
thickness=2, | |
lineType=cv.LINE_AA) | |
cv.putText( | |
dst, | |
s, (x, y), | |
cv.FONT_HERSHEY_PLAIN, | |
1.0, (255, 255, 255), | |
lineType=cv.LINE_AA) | |
feature_params = dict( | |
maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7) | |
lk_params = dict( | |
winSize=(15, 15), | |
maxLevel=2, | |
criteria=(cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT, 10, 0.03)) | |
class App: | |
def __init__(self, args, user_args, save_tracks=False): | |
self.track_len = 10 | |
self.detect_interval = 5 | |
self.tracks = [] | |
if not os.path.exists(args.video_src): | |
raise ('{} not found'.format(args.video_src)) | |
self.cap = video.create_capture(args.video_src) | |
cv.namedWindow('lk_track') | |
self.lk_params = lk_params | |
self.feature_params = feature_params | |
for k, v in vars(args).items(): | |
if k in self.feature_params: | |
self.feature_params[k] = v | |
elif k in self.lk_params: | |
self.lk_params[k] = v | |
self.frame_idx = 0 | |
self.init_writer(args) | |
self.user_args = user_args | |
self.save_tracks = save_tracks | |
def init_writer(self, args): | |
args_str = '' | |
filename_args = '' | |
cnt = 0 | |
args_vars = vars(args) | |
def no_space(x): | |
if isinstance(x, str): | |
x = ''.join(x.split()) | |
elif isinstance(x, tuple): | |
x = str(x) | |
x = ''.join(x.split()) | |
return x | |
for k in sorted(args_vars.keys()): | |
if 'criteria_params' in k: | |
continue | |
if 'video_src' in k: | |
continue | |
args_str += '--{}={}'.format(k, args_vars[k]) | |
leading_underscore = '_' if cnt else '' | |
filename_args += '{}{}-{}'.format(leading_underscore, k, | |
no_space(args_vars[k])) | |
cnt += 1 | |
try: | |
video_dir = os.path.splitext(args.video_src)[0] | |
except: | |
video_dir = 'webcam' | |
self.video_dir = video_dir | |
if not os.path.exists(video_dir): | |
os.mkdir(video_dir) | |
filename = os.path.join(video_dir, filename_args + '.mp4') | |
w = self.cap.get(cv.CAP_PROP_FRAME_WIDTH) | |
h = self.cap.get(cv.CAP_PROP_FRAME_HEIGHT) | |
fps = self.cap.get(cv.CAP_PROP_FPS) | |
fourcc = cv.VideoWriter_fourcc(*'mp4v') | |
self.out = cv.VideoWriter(filename, fourcc, fps, (int(w), int(h))) | |
def save_data(data): | |
import csv | |
filename = 'data_' + self.user_args.replace('--', '').replace( | |
'[', '').replace(']', '').replace('\'', '') | |
with open(filename, 'w') as csvfile: | |
fieldnames = ['frame', 'tracks'] | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
writer.writeheader() | |
for row in data: | |
writer.writerow({ | |
'frame': row['frame'], | |
'tracks': row['tracks'] | |
}) | |
print("saved to {}".format(filename)) | |
def run(self): | |
num_frames = int(self.cap.get(cv.CAP_PROP_FRAME_COUNT)) | |
print(num_frames) | |
self.cap.set(cv.CAP_PROP_FRAME_COUNT, num_frames) | |
data = [] | |
for idx in range(num_frames): | |
# set frame index | |
self.cap.set(cv.CAP_PROP_POS_FRAMES, idx) | |
_ret, frame = self.cap.read() | |
if not _ret: | |
print('Failed to read frame {}'.format(idx)) | |
continue | |
frame_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) | |
vis = frame.copy() | |
if len(self.tracks) > 0: | |
img0, img1 = self.prev_gray, frame_gray | |
p0 = np.float32([tr[-1] for tr in self.tracks]).reshape( | |
-1, 1, 2) | |
p1, _st, _err = cv.calcOpticalFlowPyrLK( | |
img0, img1, p0, None, **self.lk_params) | |
p0r, _st, _err = cv.calcOpticalFlowPyrLK( | |
img1, img0, p1, None, **self.lk_params) | |
d = abs(p0 - p0r).reshape(-1, 2).max(-1) | |
good = d < 1 | |
new_tracks = [] | |
for tr, (x, y), good_flag in zip(self.tracks, p1.reshape( | |
-1, 2), good): | |
if not good_flag: | |
continue | |
tr.append((x, y)) | |
if len(tr) > self.track_len: | |
del tr[0] | |
new_tracks.append(tr) | |
cv.circle(vis, (x, y), 2, (0, 255, 0), -1) | |
self.tracks = new_tracks | |
cv.polylines(vis, [np.int32(tr) for tr in self.tracks], False, | |
(0, 255, 0)) | |
draw_str(vis, (20, 20), 'track count: %d' % len(self.tracks)) | |
row = { | |
'frame': idx, | |
'tracks': [np.int32(tr) for tr in self.tracks] | |
} | |
data.append(row) | |
if self.frame_idx % self.detect_interval == 0: | |
mask = np.zeros_like(frame_gray) | |
mask[:] = 255 | |
for x, y in [np.int32(tr[-1]) for tr in self.tracks]: | |
cv.circle(mask, (x, y), 5, 0, -1) | |
p = cv.goodFeaturesToTrack( | |
frame_gray, mask=mask, **self.feature_params) | |
if p is not None: | |
for x, y in np.float32(p).reshape(-1, 2): | |
self.tracks.append([(x, y)]) | |
self.frame_idx += 1 | |
self.prev_gray = frame_gray | |
cv.imshow('lk_track', vis) | |
self.out.write(vis) | |
self.cap.release() | |
self.out.release() | |
if self.save_tracks: | |
save_data(data) | |
def main(): | |
import sys | |
import argparse | |
parser = argparse.ArgumentParser( | |
description='Lukas Kanade Optical Flow parameter explainer.') | |
parser.add_argument('video_src', type=str, default=0) | |
parser.add_argument('--maxCorners', type=int, default=100) | |
parser.add_argument('--maxLevel', type=int, default=2) | |
parser.add_argument('--qualityLevel', type=float, default=0.3) | |
parser.add_argument('--minDistance', type=int, default=7) | |
parser.add_argument('--blockSize', type=int, default=7) | |
parser.add_argument('--winSize', type=int, default=15) | |
parser.add_argument('--criteria_params1', type=int, default=10) | |
parser.add_argument('--criteria_params2', type=float, default=0.03) | |
args = parser.parse_args() | |
criteria = (cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT, | |
args.criteria_params1, args.criteria_params2) | |
arg_vars = vars(args) | |
arg_vars['criteria'] = criteria | |
if args.video_src is '0': | |
arg_vars[video_src] = 0 | |
arg_vars['winSize'] = (args.winSize, args.winSize) | |
print(__doc__) | |
App(args, user_args=sys.argv[2:]).run() | |
cv.destroyAllWindows() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment