Skip to content

Instantly share code, notes, and snippets.

@paulserraino
Created May 21, 2017 23:16
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save paulserraino/6b18a2667f7de1a55a8597b3bed093ce to your computer and use it in GitHub Desktop.
Save paulserraino/6b18a2667f7de1a55a8597b3bed093ce to your computer and use it in GitHub Desktop.
Predicting Vehicle Motion and Direction using Optical Flow

OSX Install

brew install opencv3 --with-contrib

make sure to include contrib!

import cv2
import numpy as np
cap = cv2.VideoCapture('driving_sf.mp4')
# Util functions
def dist(a, b):
return np.sqrt(np.power(b[0] - a[0], 2) + np.power(b[1] - a[1], 2))
def get_angle(v1, v2):
dx = v2[0] - v1[0]
dy = v2[1] - v2[1]
return np.arctan2(dy, dx) * 180 / np.pi
def norm_dist(v1, v2, sig=15):
theta = get_angle(v1, v2)
x = v1[0] + sig * np.cos(theta)
y = v1[1] + sig * np.sin(theta)
#print 'check', dist((x, y), v1)
return v1, (int(x), int(y))
# Lucas Kanade optical flow
def optical_flow(old_gray, frame_gray, p0):
lk_params = dict( winSize = (15,15),
maxLevel = 2,
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)
if st is not None:
good_new = p1[st==1]
good_old = p0[st==1]
return good_new, good_old
else:
return None, None
# Displays output for each frame
def draw_status(frame, mean_angle, kps):
c = (161,242,108)
h = 230
cv2.putText(frame, 'mean angle: ' + str(int(mean_angle)), (20, 20 + h), cv2.FONT_HERSHEY_TRIPLEX, 0.5, c)
cv2.putText(frame, 'keypoints: ' + str(kps), (20, 40 + h), cv2.FONT_HERSHEY_TRIPLEX, 0.5, c)
if 90 > mean_angle > 0:
motion = 'forward'
dirc = 'right'
elif 180 > mean_angle > 90:
motion = 'forward'
dirc = 'left'
elif 270 > mean_angle > 180:
motion = 'backward'
dirc = 'left'
elif 360 > mean_angle > 270:
motion = 'backward'
dirc = 'right'
else:
motion = 'stopped'
dirc = 'n/a'
cv2.putText(frame, 'motion: ' + motion, (20, 60 + h), cv2.FONT_HERSHEY_TRIPLEX, 0.5, c)
cv2.putText(frame, 'direction: ' + dirc, (20, 80 + h), cv2.FONT_HERSHEY_TRIPLEX, 0.5, c)
if 60 > mean_angle > 0:
theta = 45
elif 120 > mean_angle > 60:
theta = 90
elif 180 > mean_angle > 120:
theta = 135
else:
theta = mean_angle
bkp = (20, 0 + h)
ax = int(bkp[0] + 15 * np.cos(-theta))
ay = int(bkp[1] + 15 * np.sin(-theta))
cv2.arrowedLine(frame, bkp, (ax, ay), c, 2, tipLength=0.5)
cv2.circle(frame,bkp, 4, c,-1)
return frame
# Initialize the First Frame
ret, old_frame = cap.read()
old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)
feature_detector = cv2.ORB_create(400)
def get_features(gray):
kp, descs = feature_detector.detectAndCompute(gray, None)
return np.array([np.array([k.pt]).astype(np.float32,) for k in kp])
p0 = get_features(old_gray)
mask = np.zeros_like(old_frame)
color = np.random.randint(0,255,(500,3))
frame_count = 0
current_angle = 0
while(1):
frame_count += 1
ret, frame = cap.read()
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
angles = [0]
kps = 0
good_new, good_old = optical_flow(old_gray, frame_gray, p0)
if good_new is not None:
min_distance = 0.8
# Collect angle between keypoint points and draw vectors to frame
for i,(new,old) in enumerate(zip(good_new,good_old)):
a,b = new.ravel()
c,d = old.ravel()
if dist((a,b), (c, d)) > min_distance:
kps += 1
angles.append(get_angle((a,b), (c,d)))
xx, yy = norm_dist((a,b), (c,d))
xx, yy = norm_dist((a,b), (c,d))
cv2.arrowedLine(frame, xx, yy, color[i].tolist(), 2, tipLength=0.5)
cv2.circle(frame,(a,b), 4, color[i].tolist(),-1)
# Calculate the mean angle every n frames
if frame_count % 1 == 0:
current_angle = np.mean(angles)
frame = draw_status(frame, current_angle, kps)
cv2.imshow('frame', frame)
k = cv2.waitKey(30) & 0xff
if k == 27:
break
# Update Frame and Keypoints
old_gray = frame_gray.copy()
if good_new is not None:
p0 = good_new.reshape(-1,1,2)
else:
p0 = get_features(old_gray)
cv2.destroyAllWindows()
cap.release()
@jet-c-21
Copy link

jet-c-21 commented May 6, 2022

at line 13, isn't it should be dy = v2[1] - v1[1] ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment