Skip to content

Instantly share code, notes, and snippets.

@nicolov
Last active January 24, 2024 19:37
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save nicolov/d010233ea8d35887c6ab47cca97d396f to your computer and use it in GitHub Desktop.
Save nicolov/d010233ea8d35887c6ab47cca97d396f to your computer and use it in GitHub Desktop.
Car speed estimation from a windshield camera
*.pyc
*.tar.gz
*.mp4

Car speed estimation from a windshield camera

An simple optical-flow based approach for estimating speed using a single windshield camera.

#!/usr/bin/env python
'''
Plot theoretical optical flow
=============================
Plots the theoretical optical flow induced by a camera moving parallel to a
plane that's orthogonal to the image plane.
'''
from matplotlib import pyplot as plt
from matplotlib.lines import Line2D
import numpy as np
from IPython import embed
y_size = 10
fig = plt.figure(figsize=(6, 4), dpi=80)
ax = fig.add_subplot(111)
flow_x = lambda x, y: 0.01 * (x * y)
flow_y = lambda _, y: 0.01 * (y * y)
for x in range(-15, 15, 2):
for y in range(y_size, 0, -2):
ax.arrow(x, y_size - y, flow_x(x, y), -flow_y(x, y), head_length=0.3, head_width=0.2, ec='k', fc='k')
# ax.add_line(Line2D([x, x+flow_x(x, y)], [y_size - y, y_size - y - flow_y(x, y)]))
ax.set_xlim(-15, 15)
ax.set_ylim(-2, 10)
ax.axis('off')
ax.figure.savefig('flow_example.png', bbox_inches='tight')
# embed()
plt.show()
#!/usr/bin/env python
'''
Speed estimation using monocular optical flow
=============================================
We estimate the speed of a monocular camera traveling on a road.
The assumption is that the observed optical flow will correspond
to the one induced by a translating camera that's observing a
plane (i.e. road).
'''
import numpy as np
import cv2
import json
from time import clock
from IPython import embed
from matplotlib import pyplot as plt
import pandas as pd
# Settings for the LK tracker and the corner detector
lk_params = dict( winSize = (15, 15),
maxLevel = 2,
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
feature_params = dict( maxCorners = 500,
qualityLevel = 0.1,
minDistance = 7,
blockSize = 5 )
# Image crops from left and top edges
crop_left = 0
crop_top = 320
class App:
def __init__(self, video_src):
self.track_len = 10
self.detect_interval = 1
self.tracks = []
self.cap = cv2.VideoCapture(video_src)
self.frame_idx = 0
# Load ground truth
gt_raw = np.array(json.load(open('drive.json')))
# Holds data for each image frame
self.data_table = pd.DataFrame.from_dict({
'time': gt_raw[:, 0],
'gt': gt_raw[:, 1],
'median': np.nan,
'mean': np.nan,
'median_h': np.nan,
'median_v': np.nan,
'mean_h': np.nan,
'mean_v': np.nan})
def run(self):
while self.cap.isOpened():
ret, frame = self.cap.read()
if not ret:
break
frame = frame[crop_top:440:, crop_left:550]
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
vis = frame.copy()
#
# Optical flow tracking
if len(self.tracks) > 0:
img0, img1 = self.prev_gray, frame_gray
# Compute optical flow
p0 = self.tracks
p1, st, err = cv2.calcOpticalFlowPyrLK(img0, img1, p0, None, **lk_params)
p0r, st, err = cv2.calcOpticalFlowPyrLK(img1, img0, p1, None, **lk_params)
# Backtracking check
d = abs(p0-p0r).reshape(-1, 2).max(-1)
good = d < 1e9
# Some visualization
for (x0, y0), (x1, y1), good_flag in zip(p0.reshape(-1, 2), p1.reshape(-1, 2), good):
cv2.circle(vis, (x1, y1), 2, (0, 255, 0), -1)
pl = np.array([[x0, y0], [x1, y1]], dtype=np.int32)
cv2.polylines(vis, [pl], 1, (0, 255, 0))
# Build an array of (x, y, dx, dy) flows
flows = np.hstack((p0.reshape(-1, 2), (p1 - p0).reshape(-1, 2)))
# Switch pixel coordinates to a new frame centered in the middle point
flows[:, 0] += crop_left - 320
flows[:, 1] += crop_top - 240
# Compute the V/hf factor for each flow (2 per flow)
n_flows = flows.shape[0]
if n_flows:
V_hf = np.zeros(n_flows*2)
V_hf[:n_flows] = flows[:, 2] / flows[:, 0] / flows[:, 1] # H flow
V_hf[n_flows:] = flows[:, 3] / (flows[:, 1]**2) # V flow
# Store output
self.data_table['median'].iloc[self.frame_idx] = np.median(V_hf)
self.data_table['median_h'][self.frame_idx]= np.median(V_hf[:n_flows])
self.data_table['median_v'][self.frame_idx] = np.median(V_hf[n_flows:])
self.data_table['mean'][self.frame_idx] = np.mean(V_hf)
self.data_table['mean_h'][self.frame_idx] = np.mean(V_hf[:n_flows])
self.data_table['mean_v'][self.frame_idx] = np.mean(V_hf[n_flows:])
#
# Detection of new points
# Mask areas close to current points
mask = np.zeros_like(frame_gray)
mask[:] = 255
for x, y in [np.int32(tr[-1]) for tr in self.tracks]:
cv2.circle(mask, (x, y), 5, 0, -1)
# Mask top center (likely covered by the car ahead)
cv2.rectangle(mask, (200, 0), (440, 50), 0, -1)
# Mask edges (outside the road plane)
cv2.fillPoly(mask,
np.array([[(345, 0), (frame_gray.shape[1], 65), (frame_gray.shape[1], 0)]]), 0)
# cv2.imshow('mask', mask)
p = cv2.goodFeaturesToTrack(frame_gray, mask = mask, **feature_params)
if p is not None:
self.tracks = p # p.shape == (n , 1, 2)
self.frame_idx += 1
self.prev_gray = frame_gray
if False:
cv2.imshow('lk_track', vis)
ch = 0xFF & cv2.waitKey(1)
if ch == 27:
break
#
# Final processing
self.cap.release()
# Estimate scale by using the first n frames
num_training_frames = 1000
hf_factor = np.linalg.lstsq(self.data_table['median'][1:num_training_frames].reshape(-1, 1),
self.data_table['gt'][1:num_training_frames].reshape(-1, 1))[0][0][0]
print("Estimated hf factor = {}".format(hf_factor))
# Use the estimated scale for the rest of the images
self.data_table[['median', 'median_h', 'median_v', 'mean', 'mean_h', 'mean_v']] *= hf_factor
# Some low-passing
self.data_table['median_low'] = self.data_table['median'].rolling(window=30).mean()
# Plot
ax = self.data_table[['gt', 'median_low']].plot(figsize=(8, 5))
ax.axvline(x=num_training_frames, color='k', ls='--')
ax.set_xlim(0, 4000)
ax.set_xlabel('Frame num.')
ax.set_ylabel('Speed [m/s]')
ax.figure.savefig('result.png', bbox_inches='tight')
plt.show()
def main():
import sys
print(__doc__)
App('drive.mp4').run()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
@wironomy
Copy link

dear nikolov,
Spectacular work, but could you upload drive.json or atleast point out what is the ground truth used in the code??

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment