Car speed estimation from a windshield camera
An simple optical-flow based approach for estimating speed using a single windshield camera.
*.pyc | |
*.tar.gz | |
*.mp4 |
#!/usr/bin/env python | |
''' | |
Plot theoretical optical flow | |
============================= | |
Plots the theoretical optical flow induced by a camera moving parallel to a | |
plane that's orthogonal to the image plane. | |
''' | |
from matplotlib import pyplot as plt | |
from matplotlib.lines import Line2D | |
import numpy as np | |
from IPython import embed | |
y_size = 10 | |
fig = plt.figure(figsize=(6, 4), dpi=80) | |
ax = fig.add_subplot(111) | |
flow_x = lambda x, y: 0.01 * (x * y) | |
flow_y = lambda _, y: 0.01 * (y * y) | |
for x in range(-15, 15, 2): | |
for y in range(y_size, 0, -2): | |
ax.arrow(x, y_size - y, flow_x(x, y), -flow_y(x, y), head_length=0.3, head_width=0.2, ec='k', fc='k') | |
# ax.add_line(Line2D([x, x+flow_x(x, y)], [y_size - y, y_size - y - flow_y(x, y)])) | |
ax.set_xlim(-15, 15) | |
ax.set_ylim(-2, 10) | |
ax.axis('off') | |
ax.figure.savefig('flow_example.png', bbox_inches='tight') | |
# embed() | |
plt.show() |
#!/usr/bin/env python | |
''' | |
Speed estimation using monocular optical flow | |
============================================= | |
We estimate the speed of a monocular camera traveling on a road. | |
The assumption is that the observed optical flow will correspond | |
to the one induced by a translating camera that's observing a | |
plane (i.e. road). | |
''' | |
import numpy as np | |
import cv2 | |
import json | |
from time import clock | |
from IPython import embed | |
from matplotlib import pyplot as plt | |
import pandas as pd | |
# Settings for the LK tracker and the corner detector | |
lk_params = dict( winSize = (15, 15), | |
maxLevel = 2, | |
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)) | |
feature_params = dict( maxCorners = 500, | |
qualityLevel = 0.1, | |
minDistance = 7, | |
blockSize = 5 ) | |
# Image crops from left and top edges | |
crop_left = 0 | |
crop_top = 320 | |
class App: | |
def __init__(self, video_src): | |
self.track_len = 10 | |
self.detect_interval = 1 | |
self.tracks = [] | |
self.cap = cv2.VideoCapture(video_src) | |
self.frame_idx = 0 | |
# Load ground truth | |
gt_raw = np.array(json.load(open('drive.json'))) | |
# Holds data for each image frame | |
self.data_table = pd.DataFrame.from_dict({ | |
'time': gt_raw[:, 0], | |
'gt': gt_raw[:, 1], | |
'median': np.nan, | |
'mean': np.nan, | |
'median_h': np.nan, | |
'median_v': np.nan, | |
'mean_h': np.nan, | |
'mean_v': np.nan}) | |
def run(self): | |
while self.cap.isOpened(): | |
ret, frame = self.cap.read() | |
if not ret: | |
break | |
frame = frame[crop_top:440:, crop_left:550] | |
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
vis = frame.copy() | |
# | |
# Optical flow tracking | |
if len(self.tracks) > 0: | |
img0, img1 = self.prev_gray, frame_gray | |
# Compute optical flow | |
p0 = self.tracks | |
p1, st, err = cv2.calcOpticalFlowPyrLK(img0, img1, p0, None, **lk_params) | |
p0r, st, err = cv2.calcOpticalFlowPyrLK(img1, img0, p1, None, **lk_params) | |
# Backtracking check | |
d = abs(p0-p0r).reshape(-1, 2).max(-1) | |
good = d < 1e9 | |
# Some visualization | |
for (x0, y0), (x1, y1), good_flag in zip(p0.reshape(-1, 2), p1.reshape(-1, 2), good): | |
cv2.circle(vis, (x1, y1), 2, (0, 255, 0), -1) | |
pl = np.array([[x0, y0], [x1, y1]], dtype=np.int32) | |
cv2.polylines(vis, [pl], 1, (0, 255, 0)) | |
# Build an array of (x, y, dx, dy) flows | |
flows = np.hstack((p0.reshape(-1, 2), (p1 - p0).reshape(-1, 2))) | |
# Switch pixel coordinates to a new frame centered in the middle point | |
flows[:, 0] += crop_left - 320 | |
flows[:, 1] += crop_top - 240 | |
# Compute the V/hf factor for each flow (2 per flow) | |
n_flows = flows.shape[0] | |
if n_flows: | |
V_hf = np.zeros(n_flows*2) | |
V_hf[:n_flows] = flows[:, 2] / flows[:, 0] / flows[:, 1] # H flow | |
V_hf[n_flows:] = flows[:, 3] / (flows[:, 1]**2) # V flow | |
# Store output | |
self.data_table['median'].iloc[self.frame_idx] = np.median(V_hf) | |
self.data_table['median_h'][self.frame_idx]= np.median(V_hf[:n_flows]) | |
self.data_table['median_v'][self.frame_idx] = np.median(V_hf[n_flows:]) | |
self.data_table['mean'][self.frame_idx] = np.mean(V_hf) | |
self.data_table['mean_h'][self.frame_idx] = np.mean(V_hf[:n_flows]) | |
self.data_table['mean_v'][self.frame_idx] = np.mean(V_hf[n_flows:]) | |
# | |
# Detection of new points | |
# Mask areas close to current points | |
mask = np.zeros_like(frame_gray) | |
mask[:] = 255 | |
for x, y in [np.int32(tr[-1]) for tr in self.tracks]: | |
cv2.circle(mask, (x, y), 5, 0, -1) | |
# Mask top center (likely covered by the car ahead) | |
cv2.rectangle(mask, (200, 0), (440, 50), 0, -1) | |
# Mask edges (outside the road plane) | |
cv2.fillPoly(mask, | |
np.array([[(345, 0), (frame_gray.shape[1], 65), (frame_gray.shape[1], 0)]]), 0) | |
# cv2.imshow('mask', mask) | |
p = cv2.goodFeaturesToTrack(frame_gray, mask = mask, **feature_params) | |
if p is not None: | |
self.tracks = p # p.shape == (n , 1, 2) | |
self.frame_idx += 1 | |
self.prev_gray = frame_gray | |
if False: | |
cv2.imshow('lk_track', vis) | |
ch = 0xFF & cv2.waitKey(1) | |
if ch == 27: | |
break | |
# | |
# Final processing | |
self.cap.release() | |
# Estimate scale by using the first n frames | |
num_training_frames = 1000 | |
hf_factor = np.linalg.lstsq(self.data_table['median'][1:num_training_frames].reshape(-1, 1), | |
self.data_table['gt'][1:num_training_frames].reshape(-1, 1))[0][0][0] | |
print("Estimated hf factor = {}".format(hf_factor)) | |
# Use the estimated scale for the rest of the images | |
self.data_table[['median', 'median_h', 'median_v', 'mean', 'mean_h', 'mean_v']] *= hf_factor | |
# Some low-passing | |
self.data_table['median_low'] = self.data_table['median'].rolling(window=30).mean() | |
# Plot | |
ax = self.data_table[['gt', 'median_low']].plot(figsize=(8, 5)) | |
ax.axvline(x=num_training_frames, color='k', ls='--') | |
ax.set_xlim(0, 4000) | |
ax.set_xlabel('Frame num.') | |
ax.set_ylabel('Speed [m/s]') | |
ax.figure.savefig('result.png', bbox_inches='tight') | |
plt.show() | |
def main(): | |
import sys | |
print(__doc__) | |
App('drive.mp4').run() | |
cv2.destroyAllWindows() | |
if __name__ == '__main__': | |
main() |
gt_raw = np.array(json.load(open('drive.json'))) drive.json... failed
Please share the mp4 files?
dear nikolov,
Spectacular work, but could you upload drive.json or atleast point out what is the ground truth used in the code??
dear nikolov,
i find your work interesting but could you upload the drive.jason and the drive.mp4 files.. if it is possible????
thank you
best regards