Skip to content

Instantly share code, notes, and snippets.

@Erol444
Created March 3, 2022 12:31
Show Gist options
  • Save Erol444/5946c78e5f78f30d3c06ce6692cf6efc to your computer and use it in GitHub Desktop.
Save Erol444/5946c78e5f78f30d3c06ce6692cf6efc to your computer and use it in GitHub Desktop.
DepthAI feature tracking from video file
#!/usr/bin/env python3
import cv2
import depthai as dai
from collections import deque
import numpy as np
# Add path here!
cap = cv2.VideoCapture("/path/to/video.mp4")
class FeatureTrackerDrawer:
lineColor = (200, 0, 200)
pointColor = (0, 0, 255)
circleRadius = 2
maxTrackedFeaturesPathLength = 30
# for how many frames the feature is tracked
trackedFeaturesPathLength = 10
trackedIDs = None
trackedFeaturesPath = None
def onTrackBar(self, val):
FeatureTrackerDrawer.trackedFeaturesPathLength = val
pass
def trackFeaturePath(self, features):
newTrackedIDs = set()
for currentFeature in features:
currentID = currentFeature.id
newTrackedIDs.add(currentID)
if currentID not in self.trackedFeaturesPath:
self.trackedFeaturesPath[currentID] = deque()
path = self.trackedFeaturesPath[currentID]
path.append(currentFeature.position)
while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)):
path.popleft()
self.trackedFeaturesPath[currentID] = path
featuresToRemove = set()
for oldId in self.trackedIDs:
if oldId not in newTrackedIDs:
featuresToRemove.add(oldId)
for id in featuresToRemove:
self.trackedFeaturesPath.pop(id)
self.trackedIDs = newTrackedIDs
def drawFeatures(self, img):
cv2.setTrackbarPos(self.trackbarName, self.windowName, FeatureTrackerDrawer.trackedFeaturesPathLength)
for featurePath in self.trackedFeaturesPath.values():
path = featurePath
for j in range(len(path) - 1):
src = (int(path[j].x), int(path[j].y))
dst = (int(path[j + 1].x), int(path[j + 1].y))
cv2.line(img, src, dst, self.lineColor, 1, cv2.LINE_AA, 0)
j = len(path) - 1
cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, self.pointColor, -1, cv2.LINE_AA, 0)
def __init__(self, trackbarName, windowName):
self.trackbarName = trackbarName
self.windowName = windowName
cv2.namedWindow(windowName)
cv2.createTrackbar(trackbarName, windowName, FeatureTrackerDrawer.trackedFeaturesPathLength, FeatureTrackerDrawer.maxTrackedFeaturesPathLength, self.onTrackBar)
self.trackedIDs = set()
self.trackedFeaturesPath = dict()
# Create pipeline
pipeline = dai.Pipeline()
# Define sources and outputs
xinFrame = pipeline.createXLinkIn()
xinFrame.setStreamName("inFrame")
xinTrackedFeaturesConfig = pipeline.create(dai.node.XLinkIn)
xinTrackedFeaturesConfig.setStreamName("trackedFeaturesConfig")
featureTrackerColor = pipeline.create(dai.node.FeatureTracker)
xinFrame.out.link(featureTrackerColor.inputImage)
xinTrackedFeaturesConfig.out.link(featureTrackerColor.inputConfig)
xoutTrackedFeaturesColor = pipeline.create(dai.node.XLinkOut)
xoutTrackedFeaturesColor.setStreamName("trackedFeaturesColor")
featureTrackerColor.outputFeatures.link(xoutTrackedFeaturesColor.input)
# By default the least mount of resources are allocated
# increasing it improves performance
numShaves = 2
numMemorySlices = 2
featureTrackerColor.setHardwareResources(numShaves, numMemorySlices)
featureTrackerConfig = featureTrackerColor.initialConfig.get()
print("Press 's' to switch between Lucas-Kanade optical flow and hardware accelerated motion estimation!")
def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray:
return cv2.resize(arr, shape).transpose(2, 0, 1).flatten()
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
# Output queues used to receive the results
outputFeaturesColorQueue = device.getOutputQueue("trackedFeaturesColor", 8, False)
inputFeatureTrackerConfigQueue = device.getInputQueue("trackedFeaturesConfig")
inFrameQ = device.getInputQueue("inFrame")
colorWindowName = "color"
colorFeatureDrawer = FeatureTrackerDrawer("Feature tracking duration (frames)", colorWindowName)
while True:
if not cap.isOpened(): break
ok, frame = cap.read()
if not ok: break
img = dai.ImgFrame()
img.setType(dai.ImgFrame.Type.NV12)
frame720p = cv2.resize(frame, (1280, 720))
img.setData(cv2.cvtColor(frame720p, cv2.COLOR_BGR2YUV_YV12))
img.setWidth(1280)
img.setHeight(720)
inFrameQ.send(img)
trackedFeaturesColor = outputFeaturesColorQueue.get().trackedFeatures
colorFeatureDrawer.trackFeaturePath(trackedFeaturesColor)
colorFeatureDrawer.drawFeatures(frame720p)
# Show the frame
cv2.imshow(colorWindowName, frame720p)
key = cv2.waitKey(1)
if key == ord('q'):
break
elif key == ord('s'):
if featureTrackerConfig.motionEstimator.type == dai.FeatureTrackerConfig.MotionEstimator.Type.LUCAS_KANADE_OPTICAL_FLOW:
featureTrackerConfig.motionEstimator.type = dai.FeatureTrackerConfig.MotionEstimator.Type.HW_MOTION_ESTIMATION
print("Switching to hardware accelerated motion estimation")
else:
featureTrackerConfig.motionEstimator.type = dai.FeatureTrackerConfig.MotionEstimator.Type.LUCAS_KANADE_OPTICAL_FLOW
print("Switching to Lucas-Kanade optical flow")
cfg = dai.FeatureTrackerConfig()
cfg.set(featureTrackerConfig)
inputFeatureTrackerConfigQueue.send(cfg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment