Skip to content

Instantly share code, notes, and snippets.

@lucasmediaflow
Created July 26, 2021 04:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lucasmediaflow/63e9bd8582b7cb123209b7fb07cfdf36 to your computer and use it in GitHub Desktop.
Save lucasmediaflow/63e9bd8582b7cb123209b7fb07cfdf36 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from pathlib import Path
import sys
import cv2
import depthai as dai
import numpy as np
from time import monotonic, sleep
import threading
pipeline = dai.Pipeline()
pipeline.setOpenVINOVersion(dai.OpenVINO.Version.VERSION_2021_2)
camera = pipeline.createColorCamera()
camera.setResolution(dai.ColorCameraProperties.SensorResolution.THE_12_MP)
# camera.setStillSize(4032,3040)
camera.setInterleaved(False)
camera.setPreviewKeepAspectRatio(False)
neuralNetwork = pipeline.createMobileNetDetectionNetwork()
neuralNetwork.setBlobPath(str(Path('/home/pi/oak/models/mobilenet-ssd_openvino_2021.2_8shave.blob')))
frameInXLinkIn = pipeline.createXLinkIn()
nnXLinkOut = pipeline.createXLinkOut()
frameInXLinkIn.setStreamName('frameIn')
nnXLinkOut.setStreamName('nnXLinkOut')
neuralNetwork.setConfidenceThreshold(0.5)
neuralNetwork.setNumInferenceThreads(2)
neuralNetwork.input.setBlocking(False)
jpegEncoder = pipeline.createVideoEncoder()
jpegEncoder.setDefaultProfilePreset(camera.getStillSize(), 1, dai.VideoEncoderProperties.Profile.MJPEG)
jpegEncoderXLinkOut = pipeline.createXLinkOut()
jpegEncoderXLinkOut.setStreamName('jpegEncoderXLinkOut')
controller = pipeline.createXLinkIn()
controller.setStreamName('controller')
controller.out.link(camera.inputControl)
frameInXLinkIn.out.link(neuralNetwork.input)
neuralNetwork.out.link(nnXLinkOut.input)
camera.still.link(jpegEncoder.input)
jpegEncoder.bitstream.link(jpegEncoderXLinkOut.input)
with dai.Device(pipeline) as device:
controllerQ = device.getInputQueue('controller')
frameInQ = device.getInputQueue(name = 'frameIn')
neuralNetworkQ = device.getOutputQueue(name = 'nnXLinkOut', maxSize = 1, blocking = False)
jpegQ = device.getOutputQueue('jpegEncoderXLinkOut')
frame = None
detections = []
ctrl = dai.CameraControl()
# ctrl.setCaptureStill(True)
# controllerQ.send(ctrl)
def takePic():
global ctrl, controllerQ, killThread
while True:
ctrl.setCaptureStill(True)
controllerQ.send(ctrl)
sleep(0.1)
if killThread:
break
killThread = False
takePicThread = threading.Thread(name='runTakePic', target=takePic)
# takePicThread.start()
def frameNorm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray:
return cv2.resize(arr, shape).transpose(2, 0, 1).flatten()
def displayFrame(name, frame):
for detection in detections:
print('left', detection.xmin, 'right', detection.xmax, 'top', detection.ymin, 'bottom', detection.ymax)
bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 40), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (255, 0, 0), 2)
# Show the frame
cv2.imshow(name, frame)
while True:
for jpegFrame in jpegQ.tryGetAll():
print('tenemos jpg frames')
frame = cv2.imdecode(jpegFrame.getData(), cv2.IMREAD_UNCHANGED)
sendBackImg = dai.ImgFrame()
sendBackImg.setData(to_planar(frame, (300, 300)))
sendBackImg.setTimestamp(monotonic())
sendBackImg.setWidth(300)
sendBackImg.setHeight(300)
frameInQ.send(sendBackImg)
# cv2.imwrite('/home/lucas/oak/output/testy.jpg', frame)
nnFrames = neuralNetworkQ.tryGet()
if frame is not None and nnFrames is not None:
detections = nnFrames.detections
displayFrame('framey', frame)
else:
print('------------------NO NN DETECTIONS------------------')
logo = cv2.imread('/home/pi/mediaflow-logo.png')
cv2.imshow('window', logo)
key = cv2.waitKey(1)
if key == ord('q'):
print('q pressed, quitting')
killThread = True
break
elif key == ord('c'):
print('take photo!')
# ctrl = dai.CameraControl()
ctrl.setCaptureStill(True)
controllerQ.send(ctrl)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment