Skip to content

Instantly share code, notes, and snippets.

@Erol444
Created February 28, 2022 08:42
Show Gist options
  • Save Erol444/aa8c3904063eb5a762d8667766aed0a2 to your computer and use it in GitHub Desktop.
Save Erol444/aa8c3904063eb5a762d8667766aed0a2 to your computer and use it in GitHub Desktop.
DepthAI vehicle speed estimation on OAK - Luxonis
#!/usr/bin/env python3
import argparse
import cv2
import depthai as dai
import blobconverter
import numpy as np
from libraries.depthai_replay import Replay
import time
from time import monotonic
import math
import filterpy
from filterpy.kalman import KalmanFilter
from filterpy.common import Q_discrete_white_noise
from scipy.linalg import block_diag
labelMap = ['background', 'car']
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--path', default="data", type=str, help="Path where to store the captured data")
parser.add_argument('-t', '--threshold', default=0.15, type=float,
help="Minimum distance the person has to move (across the x/y axis) to be considered a real movement") #-------
args = parser.parse_args()
# Create Replay object
replay = Replay(args.path)
# Minimum distance the person has to move (across the x/y axis) to be considered a real movement
THRESH_DIST_DELTA = args.threshold #---------------
# Initialize the pipeline. This will create required XLinkIn's and connect them together
pipeline, nodes = replay.init_pipeline()
# Resize color frames prior to sending them to the device
replay.set_resize_color((672, 384))
# Keep aspect ratio when resizing the color frames. This will crop
# the color frame to the desired aspect ratio (in our case 672x384)
replay.keep_aspect_ratio(True)
# Create and configure the network
nn = pipeline.createMobileNetSpatialDetectionNetwork()
nn.setBoundingBoxScaleFactor(0.3)
nn.setDepthLowerThreshold(5000) # at least 5m
nn.setDepthUpperThreshold(20000) # 20m
nn.setBlobPath(str(blobconverter.from_zoo(name="vehicle-detection-adas-0002", shaves=6)))
nn.setConfidenceThreshold(0.5)
nn.input.setBlocking(False)
#Create and configure Object Tracker
objectTracker = pipeline.createObjectTracker()
objectTracker.setDetectionLabelsToTrack([1]) # Track cars
# possible tracking types: ZERO_TERM_COLOR_HISTOGRAM, ZERO_TERM_IMAGELESS, SHORT_TERM_IMAGELESS, SHORT_TERM_KCF
objectTracker.setTrackerType(dai.TrackerType.ZERO_TERM_COLOR_HISTOGRAM)
# Take the smallest ID when new object is tracked, possible options: SMALLEST_ID, UNIQUE_ID
objectTracker.setTrackerIdAssigmentPolicy(dai.TrackerIdAssigmentPolicy.SMALLEST_ID)
# Link required inputs to the Spatial detection network
nodes.color.out.link(nn.input)
nodes.stereo.setSubpixel(True)
nodes.stereo.depth.link(nn.inputDepth)
detOut = pipeline.createXLinkOut()
detOut.setStreamName("det_out")
nn.out.link(detOut.input)
depthOut = pipeline.createXLinkOut()
depthOut.setStreamName("depth_out")
nodes.stereo.disparity.link(depthOut.input)
#Link NN outputs to the object tracker
nn.passthrough.link(objectTracker.inputTrackerFrame)
nn.passthrough.link(objectTracker.inputDetectionFrame)
nn.out.link(objectTracker.inputDetections)
# Send tracklets to the host --------------
trackerOut = pipeline.createXLinkOut()
trackerOut.setStreamName("tracklets")
objectTracker.out.link(trackerOut.input)
# Send RGB preview frames to the host
xlinkOut = pipeline.createXLinkOut()
xlinkOut.setStreamName("preview")
objectTracker.passthroughTrackerFrame.link(xlinkOut.input)
class TextHelper:
def __init__(self) -> None:
self.bg_color = (0, 0, 0)
self.color = (255, 255, 255)
self.text_type = cv2.FONT_HERSHEY_SIMPLEX
self.line_type = cv2.LINE_AA
def putText(self, frame, text, coords):
cv2.putText(frame, text, coords, self.text_type, 1.0, self.bg_color, 3, self.line_type)
cv2.putText(frame, text, coords, self.text_type, 1.0, self.color, 1, self.line_type)
def rectangle(self, frame, bbox):
x1,y1,x2,y2 = bbox
cv2.rectangle(frame, (x1, y1), (x2, y2), self.bg_color, 3)
cv2.rectangle(frame, (x1, y1), (x2, y2), self.color, 1)
##Class for tracking cars and estimating the speed of the vehicles
class CarTracker:
def __init__(self):
self.startTime = time.monotonic()
self.counter = 0
self.fps = 0
self.frame = None
self.color = (255, 0, 0)
self.tracking = {}
self.car_counter = [0, 0] # [0] = Y axis (up/down), [1] = X axis (left/right) #NOT SURE FOR THIS
self.statusMap = {
dai.Tracklet.TrackingStatus.NEW : "NEW",
dai.Tracklet.TrackingStatus.TRACKED :"TRACKED",
dai.Tracklet.TrackingStatus.LOST : "LOST",
dai.Tracklet.TrackingStatus.REMOVED: "REMOVED"}
self.dictionary = {}
self.covarianceMatrix = {}
self.objIDs = []
def to_planar(self, arr: np.ndarray, shape: tuple) -> np.ndarray:
return cv2.resize(arr, shape).transpose(2, 0, 1).flatten()
def tracklet_removed(self, coords1, coords2):
deltaX = coords2[0] - coords1[0]
deltaY = coords2[1] - coords1[1]
if abs(deltaX) > abs(deltaY) and abs(deltaX) > THRESH_DIST_DELTA:
direction = "left" if 0 > deltaX else "right"
self.car_counter[1] += 1 if 0 > deltaX else -1
print(f"Car moved {direction}")
# print("DeltaX: " + str(abs(deltaX)))
if abs(deltaY) > abs(deltaX) and abs(deltaY) > THRESH_DIST_DELTA:
direction = "up" if 0 > deltaY else "down"
self.car_counter[0] += 1 if 0 > deltaY else -1
print(f"Car moved {direction}")
# print("DeltaY: " + str(abs(deltaY)))
# else: print("Invalid movement")
def get_centroid(self, roi):
x1 = roi.topLeft().x
y1 = roi.topLeft().y
x2 = roi.bottomRight().x
y2 = roi.bottomRight().y
return ((x2-x1)/2+x1, (y2-y1)/2+y1)
def euclidean_distance (self, x1, y1, z1, x2, y2, z2): #-----------
dx, dy, dz = x1 - x2, y1 - y2, z1 - z2
distance = math.sqrt(dx ** 2 + dy ** 2 + dz ** 2)
return distance
def check_queues(self, preview, tracklets):
#dictionary = {}
#TextHelper = TextHelper() #-------------
imgFrame = preview.tryGet()
track = tracklets.tryGet()
if imgFrame is not None:
self.counter+=1
current_time = time.monotonic()
if (current_time - self.startTime) > 1 :
self.fps = self.counter / (current_time - self.startTime)
self.counter = 0
self.startTime = current_time
self.frame = imgFrame.getCvFrame()
if track is not None:
trackletsData = track.tracklets
for t in trackletsData:
#print(t.id)
#print(t)
roi = t.roi.denormalize(self.frame.shape[1], self.frame.shape[0])
x1 = int(roi.topLeft().x)
y1 = int(roi.topLeft().y)
x2 = int(roi.bottomRight().x)
y2 = int(roi.bottomRight().y)
try:
label = labelMap[t.label]
except:
label = t.label
# If new tracklet, save its centroid
if (t.status == dai.Tracklet.TrackingStatus.NEW):
self.tracking[str(t.id)] = self.get_centroid(t.roi)
# If tracklet was removed, check the "path" of this traclet
if (t.status == dai.Tracklet.TrackingStatus.REMOVED):
self.tracklet_removed(self.tracking[str(t.id)], self.get_centroid(t.roi))
x_spatial = "{:.1f}".format(t.spatialCoordinates.x/1000)
y_spatial = "{:.1f}".format(t.spatialCoordinates.y/1000)
z_spatial = "{:.1f}".format(t.spatialCoordinates.z/1000)
#(x2 + 20, y2 - 60)
#cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (79, 79, 47), -1)
#cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (250, 206, 135), 2)
#cv2.putText(self.frame, f"ID: {t.id}", (x1 + 5, y1 - 15), cv2.FONT_HERSHEY_SIMPLEX , 0.75, (250, 206, 135), 2 , cv2.LINE_AA)
if(int(t.id) in self.dictionary and int(t.id) in self.objIDs and (t.status == dai.Tracklet.TrackingStatus.TRACKED or t.status == dai.Tracklet.TrackingStatus.NEW or t.status == dai.Tracklet.TrackingStatus.LOST)):
values = self.dictionary[int(t.id)] # values array contains the state vector for the Kalman Filter for that ovject
print(values)
#fps1 = values.pop(0)
values = np.array([[values.pop(0), values.pop(0), values.pop(0), values.pop(0), values.pop(0), values.pop(0)]]).T
print("SHAPE")
print(values.shape)
KF = KalmanFilter(dim_x=6, dim_z=3)
KF.x = values
dt = 1.0
#KF.P = np.eye(6) * 1000. # covariance matrix
KF.P = self.covarianceMatrix[int(t.id)]
KF.F = np.array([[1, dt, 0, 0, 0, 0],
[0, 1, 0, 0, 0 ,0],
[0, 0, 1, dt, 0 , 0],
[0, 0, 0, 1, 0 ,0],
[0,0, 0, 0,1,dt],
[0, 0, 0, 0, 0, 1]]) # state transition matrix
q = Q_discrete_white_noise(dim=3, dt=dt, var = 0.05) # process uncertainty with variance 0.05
KF.Q = block_diag(q,q)
KF.H = np.array([[0,1,0,0,0,0],
[0,0,0,1,0,0],
[0,0,0,0,0,1]]) # Measurement function defines how we go from state variables to measurement using the equation z = Hx
KF.R = np.array([[5, 0, 0],
[0, 5, 0],
[0, 0, 5]]) # state uncertainty
z = np.array([[float(t.spatialCoordinates.x/1000)], [float(t.spatialCoordinates.y/1000)], [float(t.spatialCoordinates.z/1000)]])
KF.predict() # predict next state (prior) using the Kalman filter state propagation equations, given the previous measurements
KF.update(z) # add a new measurement (z) to the Kalman filter and the current state of the system is
# estimated given the measurement z
x = KF.x
P = KF.P
print(x)
print("Values")
print(values)
speed = math.sqrt(abs(x[1,0] * x[1,0]) + abs(x[3,0] * x[3,0]) + abs(x[5,0] * x[5,0]))
speed = "{:.1f}".format(speed)
print(speed)
#40
#cv2.putText(self.frame, f"ID: {t.id}", (x1 + 10, y1 - 15), cv2.FONT_HERSHEY_SIMPLEX , 0.55, (0, 0, 255), 2 , cv2.LINE_AA)
cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (79, 79, 47), -1)
cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (250, 206, 135), 2)
cv2.putText(self.frame, f"ID: {t.id}", (x1 + 5, y1 - 25), cv2.FONT_HERSHEY_SIMPLEX , 0.75, (250, 206, 135), 2 , cv2.LINE_AA)
cv2.putText(self.frame, f"{speed} m/s" , (x1 + 5, y1 - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.75,(250, 206, 135), 2,cv2.LINE_AA)
#cv2.rectangle(self.frame, (x1 - 10, y1 - 70), (x2 + 20, y2 - 60), (0, 0, 0), -1)
x2p = x[0,0]# updated x spatial coordinate
x2v = x[1,0]# updated Vx
y2p = x[2,0]# updated y spatial coordinate
y2v = x[3,0]# updated Vy
z2p = x[4,0]# updated z spatial coordinate
z2v = x[5,0]# updated Vz
values = []
#values.append(self.fps)
values.append(x2p)
values.append(x2v)
values.append(y2p)
values.append(y2v)
values.append(z2p)
values.append(z2v)
self.dictionary[int(t.id)] = values
del self.covarianceMatrix[int(t.id)]
self.covarianceMatrix[int(t.id)] = P
elif (int(t.id) not in self.dictionary and int(t.id) not in self.objIDs and t.status == dai.Tracklet.TrackingStatus.NEW):#(t.status == dai.Tracklet.TrackingStatus.TRACKED or t.status == dai.Tracklet.TrackingStatus.NEW)):
self.objIDs.append(int(t.id))
KF = KalmanFilter(dim_x=6, dim_z=3)
#state vector has spatial coordinates (x,y,z) and their velocities, so the dimension is 6 (dim_x)
# measurement vector has spatial coordinates(x,y,z) that the camera reads (dim_z = 3)
KF.x = np.array([[0, 0, 0, 0, 0, 0]]).T # initial state vector (spatial coordinates and their velocities)
dt = 1.0
KF.P = np.eye(6) * 1000. #covariance matrix 1000
KF.F = np.array([[1, dt, 0,0, 0, 0], #state transition matrix
[0, 1, 0, 0, 0 ,0],
[0, 0, 1, dt, 0 , 0],
[0, 0, 0, 1, 0 ,0],
[0, 0, 0, 0, 1, dt],
[0, 0, 0, 0, 0, 1]])
q = Q_discrete_white_noise(dim=3, dt=dt, var = 0.05) # process uncertainty/noise matrix with variance 0,05
KF.Q = block_diag(q,q)
KF.H = np.array([[0,1,0,0,0,0],
[0,0,0,1,0,0],
[0,0,0,0,0,1]]) # Measurement function defines how we go from state variables to measurement using the equation z = Hx
KF.R = np.array([[5, 0, 0], #state uncertainty 5
[0, 5, 0],
[0, 0, 5]])
z = np.array([[float(t.spatialCoordinates.x/1000)], [float(t.spatialCoordinates.y/1000)], [float(t.spatialCoordinates.z/1000)]])
KF.predict() # predict next state (prior) using the Kalman filter state propagation equations, given the previous measurements
KF.update(z) # add a new measurement (z) to the Kalman filter and the current state of the system is
# estimated given the measurement z
x = KF.x # updated x which will be used as a state vector in the next step
P = KF.P #updated covariance matix P
print(x)
print("SHAPE: ")
print(x.shape)
#Calculates speed using the equation V = sqrt(( Vx * Vx ) + (Vy * Vy) + (Vz + Vz))
speed = math.sqrt(abs(x[1,0] * x[1,0]) + abs(x[3,0] * x[3,0]) + abs(x[5,0] * x[5,0]))
speed = "{:.1f}".format(speed)
print(speed)
cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (79, 79, 47), -1)
cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (250, 206, 135), 2)
cv2.putText(self.frame, f"ID: {t.id}", (x1 + 5, y1 - 25), cv2.FONT_HERSHEY_SIMPLEX , 0.75, (250, 206, 135), 2 , cv2.LINE_AA)
cv2.putText(self.frame, f"{speed} m/s" , (x1 + 5, y1 - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (250, 206, 135), 2 ,cv2.LINE_AA)
values = []
#values.append(self.fps)
values.append(x[0, 0]) # x spatial coordinate
values.append(x[1, 0])# Vx
values.append(x[2, 0])# y spatial coordinate
values.append(x[3, 0])# Vy
values.append(x[4, 0])# z spatial coordinate
values.append(x[5, 0])# Vz
print(values)
self.dictionary[int(t.id)] = values
self.covarianceMatrix[int(t.id)] = P
elif(int(t.id) in self.dictionary and t.status == dai.Tracklet.TrackingStatus.REMOVED):
del self.dictionary[int(t.id)]
del self.covarianceMatrix[int(t.id)]
self.objIDs.remove(int(t.id))
if self.frame is not None:
#cv2.putText(self.frame, "NN fps: {:.2f}".format(self.fps), (2, self.frame.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.4, self.color)
#cv2.putText(self.frame, f"Counter X: {self.car_counter[1]}, Counter Y: {self.car_counter[0]}", (3, 20), cv2.FONT_HERSHEY_TRIPLEX, 0.7, (255,255,0))
cv2.imshow("tracker", self.frame)
with dai.Device(pipeline) as device:
replay.create_queues(device)
CarTracker = CarTracker()
depthQ = device.getOutputQueue(name="depth_out", maxSize=4, blocking=False)
detQ = device.getOutputQueue(name="det_out", maxSize=4, blocking=False)
tracklets = device.getOutputQueue(name="tracklets", maxSize=4, blocking=False)
preview = device.getOutputQueue("preview", maxSize=4, blocking=False)
disparityMultiplier = 255 / nodes.stereo.initialConfig.getMaxDisparity()
color = (255, 0, 0)
queue = []
# Read rgb/mono frames, send them to device and wait for the spatial object detection results
while replay.send_frames():
#rgbFrame = replay.lastFrame['color']
depthFrame = depthQ.get().getFrame()
depthFrameColor = (depthFrame*disparityMultiplier).astype(np.uint8)
# depthFrameColor = cv2.normalize(depthFrame, None, 255, 0, cv2.NORM_INF, cv2.CV_8UC1)
# depthFrameColor = cv2.equalizeHist(depthFrameColor)
depthFrameColor = cv2.applyColorMap(depthFrameColor, cv2.COLORMAP_JET)
cv2.imshow("depth", depthFrameColor)
CarTracker.check_queues(preview,tracklets)
if cv2.waitKey(1) == ord('q'):
break
print('End of the recording')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment