Skip to content

Instantly share code, notes, and snippets.

@Erol444
Created February 26, 2024 17:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Erol444/c77add48747b05c81f1bf30434cfbdfb to your computer and use it in GitHub Desktop.
Save Erol444/c77add48747b05c81f1bf30434cfbdfb to your computer and use it in GitHub Desktop.
DepthAI auto-reconnect
#!/usr/bin/env python3
import cv2
import depthai as dai
import numpy as np
import time
import blobconverter
# MobilenetSSD label texts
labelMap = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow",
"diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"]
# Create pipeline
pipeline = dai.Pipeline()
# Define sources and outputs
camRgb = pipeline.create(dai.node.ColorCamera)
nn = pipeline.create(dai.node.MobileNetDetectionNetwork)
xoutRgb = pipeline.create(dai.node.XLinkOut)
nnOut = pipeline.create(dai.node.XLinkOut)
nnNetworkOut = pipeline.create(dai.node.XLinkOut)
xoutRgb.setStreamName("rgb")
nnOut.setStreamName("nn")
nnNetworkOut.setStreamName("nnNetwork")
# Properties
camRgb.setPreviewSize(300, 300)
camRgb.setInterleaved(False)
camRgb.setFps(40)
# Define a neural network that will make predictions based on the source frames
nn.setConfidenceThreshold(0.5)
nn.setBlobPath(blobconverter.from_zoo(name="mobilenet-ssd", shaves=6))
nn.input.setBlocking(False)
nn.passthrough.link(xoutRgb.input)
camRgb.preview.link(nn.input)
nn.out.link(nnOut.input)
nn.outNetwork.link(nnNetworkOut.input)
deviceInfo: dai.DeviceInfo = None
def connect_and_run():
global deviceInfo
with dai.Device(pipeline, deviceInfo) as device:
# Output queues will be used to get the rgb frames and nn data from the outputs defined above
qRgb = device.getOutputQueue(name="rgb", maxSize=4, blocking=False)
qDet = device.getOutputQueue(name="nn", maxSize=4, blocking=False)
qNN = device.getOutputQueue(name="nnNetwork", maxSize=4, blocking=False)
frame = None
detections = []
startTime = time.monotonic()
counter = 0
color2 = (255, 255, 255)
# nn data (bounding box locations) are in <0..1> range - they need to be normalized with frame width/height
def frameNorm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
def displayFrame(name, frame):
color = (255, 0, 0)
for detection in detections:
bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
cv2.putText(frame, labelMap[detection.label], (bbox[0] + 10, bbox[1] + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, color)
cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 40), cv2.FONT_HERSHEY_TRIPLEX, 0.5, color)
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
# Show the frame
cv2.imshow(name, frame)
printOutputLayersOnce = True
while True:
inRgb = qRgb.get()
inDet = qDet.get()
inNN = qNN.get()
if inRgb is not None:
frame = inRgb.getCvFrame()
cv2.putText(frame, "NN fps: {:.2f}".format(counter / (time.monotonic() - startTime)),
(2, frame.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.4, color2)
if inDet is not None:
detections = inDet.detections
counter += 1
if printOutputLayersOnce and inNN is not None:
toPrint = 'Output layer names:'
for ten in inNN.getAllLayerNames():
toPrint = f'{toPrint} {ten},'
print(toPrint)
printOutputLayersOnce = False;
# If the frame is available, draw bounding boxes on it and show the frame
if frame is not None:
displayFrame("rgb", frame)
if cv2.waitKey(1) == ord('q'):
break
# Adding exception handling to retry the connection if it fails due to a device disconnection
import time
while True:
try:
connect_and_run()
except RuntimeError as e:
print(f"Runtime error: {e}. Retrying in 2 seconds...")
except Exception as e:
print(f"Error: {e}. Retrying in 2 seconds...")
time.sleep(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment