Skip to content

Instantly share code, notes, and snippets.

@Erol444
Created June 6, 2022 11:17
Show Gist options
  • Save Erol444/0094479718c62604aa6441f72885cfc5 to your computer and use it in GitHub Desktop.
Save Erol444/0094479718c62604aa6441f72885cfc5 to your computer and use it in GitHub Desktop.
DepthAI running age-gender demo from video
from MultiMsgSync import TwoStageHostSeqSync
import blobconverter
import cv2
import depthai as dai
import numpy as np
def frame_norm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
def create_pipeline(stereo):
pipeline = dai.Pipeline()
cam_xin = pipeline.create(dai.node.XLinkIn)
cam_xin.setStreamName("frame-in")
# No need to actually send back the frame
cam_xout = pipeline.create(dai.node.XLinkOut)
cam_xout.setStreamName("color")
cam_xin.out.link(cam_xout.input)
# ImageManip will resize the frame before sending it to the Face detection NN node
face_det_manip = pipeline.create(dai.node.ImageManip)
face_det_manip.initialConfig.setResize(300, 300)
face_det_manip.initialConfig.setFrameType(dai.RawImgFrame.Type.RGB888p)
cam_xin.out.link(face_det_manip.inputImage)
face_det_nn = pipeline.create(dai.node.MobileNetDetectionNetwork)
face_det_nn.setConfidenceThreshold(0.5)
face_det_nn.setBlobPath(blobconverter.from_zoo(name="face-detection-retail-0004", shaves=6))
face_det_manip.out.link(face_det_nn.input)
# Send face detections to the host (for bounding boxes)
face_det_xout = pipeline.create(dai.node.XLinkOut)
face_det_xout.setStreamName("detection")
face_det_nn.out.link(face_det_xout.input)
# Script node will take the output from the face detection NN as an input and set ImageManipConfig
# to the 'image_manip_script' to crop the initial frame
image_manip_script = pipeline.create(dai.node.Script)
face_det_nn.out.link(image_manip_script.inputs['face_det_in'])
# Only send metadata, we are only interested in timestamp, so we can sync
# depth frames with NN output
face_det_nn.passthrough.link(image_manip_script.inputs['passthrough'])
image_manip_script.setScript("""
l = [] # List of images
# So the correct frame will be the first in the list
# For this experiment this function is redundant, since everything
# runs in blocking mode, so no frames will get lost
def get_latest_frame(seq):
global l
for i, frame in enumerate(l):
if seq == frame.getSequenceNum():
# node.warn(f"List len {len(l)} Frame with same seq num: {i},seq {seq}")
l = l[i:]
break
return l[0]
def correct_bb(bb):
if bb.xmin < 0: bb.xmin = 0.001
if bb.ymin < 0: bb.ymin = 0.001
if bb.xmax > 1: bb.xmax = 0.999
if bb.ymax > 1: bb.ymax = 0.999
return bb
while True:
preview = node.io['preview'].tryGet()
if preview is not None:
node.warn(f"New frame {preview.getSequenceNum()}, total {len(l)}")
l.append(preview)
face_dets = node.io['face_det_in'].tryGet()
# node.warn(f"Faces detected: {len(face_dets)}")
if face_dets is not None:
passthrough = node.io['passthrough'].get()
seq = passthrough.getSequenceNum()
# node.warn(f"New detection {seq}")
if len(l) == 0:
continue
img = get_latest_frame(seq)
for i, det in enumerate(face_dets.detections):
cfg = ImageManipConfig()
correct_bb(det)
cfg.setCropRect(det.xmin, det.ymin, det.xmax, det.ymax)
# node.warn(f"Sending {i + 1}. det. Seq {seq}. Det {det.xmin}, {det.ymin}, {det.xmax}, {det.ymax}")
cfg.setResize(62, 62)
cfg.setKeepAspectRatio(False)
node.io['manip_cfg'].send(cfg)
node.io['manip_img'].send(img)
""")
cam_xin.out.link(image_manip_script.inputs['preview'])
crop_manip = pipeline.create(dai.node.ImageManip)
crop_manip.initialConfig.setResize(62, 62)
crop_manip.setWaitForConfigInput(True)
image_manip_script.outputs['manip_cfg'].link(crop_manip.inputConfig)
image_manip_script.outputs['manip_img'].link(crop_manip.inputImage)
# Age/Gender second stange NN
print("Creating Age Gender Neural Network...")
recognition_nn = pipeline.create(dai.node.NeuralNetwork)
recognition_nn.setBlobPath(blobconverter.from_zoo(name="age-gender-recognition-retail-0013", shaves=6))
crop_manip.out.link(recognition_nn.input)
recognition_nn_xout = pipeline.create(dai.node.XLinkOut)
recognition_nn_xout.setStreamName("recognition")
recognition_nn.out.link(recognition_nn_xout.input)
return pipeline
def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray:
return cv2.resize(arr, shape).transpose(2, 0, 1).flatten()
with dai.Device(create_pipeline(False)) as device:
sync = TwoStageHostSeqSync()
queues = {}
# Create output queues
for name in ["color", "detection", "recognition"]:
queues[name] = device.getOutputQueue(name)
xin = device.getInputQueue("frame-in")
cap = cv2.VideoCapture("demo.mp4")
cnt = 0
while cap.isOpened():
read_correctly, frame = cap.read()
if not read_correctly:
break
img = dai.ImgFrame()
print("Sending frame")
frame = cv2.resize(frame, (1080,1080))
img.setData(to_planar(frame, (1080, 1080)))
img.setType(dai.ImgFrame.Type.BGR888p)
img.setWidth(1080)
img.setHeight(1080)
img.setSequenceNum(cnt)
print(img.getSequenceNum(), cnt)
xin.send(img)
cnt += 1
for name, q in queues.items():
# Add all msgs (object detections and recognitions) to the Sync class.
if name == "color":
print("wait for color frame")
sync.add_msg(q.get(), name) # block to get the frame
print("got color frame")
else:
if q.has():
sync.add_msg(q.get(), name)
msgs = sync.get_msgs()
if msgs is not None:
print("Synced!")
frame = msgs["color"].getCvFrame()
detections = msgs["detection"].detections
recognitions = msgs["recognition"]
for i, detection in enumerate(detections):
bbox = frame_norm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
rec = recognitions[i]
age = int(float(np.squeeze(np.array(rec.getLayerFp16('age_conv3')))) * 100)
gender = np.squeeze(np.array(rec.getLayerFp16('prob')))
gender_str = "female" if gender[0] > gender[1] else "male"
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (10, 245, 10), 2)
y = (bbox[1] + bbox[3]) // 2
cv2.putText(frame, str(age), (bbox[0], y), cv2.FONT_HERSHEY_TRIPLEX, 1.5, (0, 0, 0), 8)
cv2.putText(frame, str(age), (bbox[0], y), cv2.FONT_HERSHEY_TRIPLEX, 1.5, (255, 255, 255), 2)
cv2.putText(frame, gender_str, (bbox[0], y + 30), cv2.FONT_HERSHEY_TRIPLEX, 1.5, (0, 0, 0), 8)
cv2.putText(frame, gender_str, (bbox[0], y + 30), cv2.FONT_HERSHEY_TRIPLEX, 1.5, (255, 255, 255), 2)
cv2.imshow("Camera", frame)
if cv2.waitKey(1) == ord('q'):
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment