Skip to content

Instantly share code, notes, and snippets.

@alex-luxonis
Created September 17, 2021 13:48
Show Gist options
  • Save alex-luxonis/b1efb4b5cde7dad1ddd315ded837b2e9 to your computer and use it in GitHub Desktop.
Save alex-luxonis/b1efb4b5cde7dad1ddd315ded837b2e9 to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta
import cv2
import depthai as dai
try:
from loguru import logger
except ImportError:
import logging as logger
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" # type: str
DATE_FORMAT = "%mm/%dd/%YY %HH:%MH:%SH " # type: str
logger.basicConfig(level=logger.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT)
def create_pipeline():
logger.info("Creating pipeline...")
pipeline = dai.Pipeline() # type: dai.Pipeline
logger.info("Creating Color Camera ...")
cam_rgb = pipeline.createColorCamera() # type: dai.node.ColorCamera
cam_rgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam_rgb.setBoardSocket(dai.CameraBoardSocket.RGB)
cam_rgb.setIspScale(1, 2)
logger.info("Creating Left Camera ...")
cam_left = pipeline.createColorCamera() # type: dai.node.ColorCamera
cam_left.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam_left.setBoardSocket(dai.CameraBoardSocket.LEFT)
cam_left.setIspScale(1, 2)
logger.info("Creating Right Camera ...")
cam_right = pipeline.createColorCamera() # type: dai.node.ColorCamera
cam_right.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam_right.setBoardSocket(dai.CameraBoardSocket.RIGHT)
cam_right.setIspScale(1, 2)
cam_select = pipeline.createXLinkIn()
cam_select.setStreamName("cam_select")
image_manip_script = pipeline.create(dai.node.Script)
image_manip_script.setProcessor(dai.ProcessorType.LEON_CSS) # Default: LEON_MSS
# image_manip_script.inputs["cam_select"].setBlocking(False)
# image_manip_script.inputs["cam_select"].setQueueSize(1)
cam_select.out.link(image_manip_script.inputs["cam_select"])
cam_left.isp.link(image_manip_script.inputs["left_in"])
cam_right.isp.link(image_manip_script.inputs["right_in"])
cam_rgb.isp.link(image_manip_script.inputs["rgb_in"])
image_manip_script.setScript(
"""
from datetime import datetime, timedelta
def wait_for_results(queue):
# type: (dai.DataOutputQueue) -> bool
start = datetime.now()
while 1:
res = queue.tryGet()
if res is not None:
break
if datetime.now() - start > timedelta(seconds=1):
return False , res
return True, res
while 1 :
flag_ = node.io["cam_select"].get().getData().tolist()[0]
node.warn(f"flag_ = {flag_}")
if flag_ == 0:
has_results,result = wait_for_results(node.io["left_in"])
elif flag_ == 1:
has_results,result = wait_for_results(node.io["right_in"])
elif flag_ == 2:
has_results,result = wait_for_results(node.io["rgb_in"])
if not has_results:
node.warn(f"flag_ {flag_} no data")
node.io["to_manip"].send(result)
"""
)
cam_out = pipeline.createXLinkOut() # type: dai.node.XLinkOut
cam_out.setStreamName("cam_out")
image_manip_script.outputs["to_manip"].link(cam_out.input)
return pipeline
def wait_for_results(queue):
# type: (dai.DataOutputQueue) -> bool
start = datetime.now()
while not queue.has():
if datetime.now() - start > timedelta(seconds=1):
return False
return True
# @logger.catch
def main():
with dai.Device(create_pipeline()) as device:
# Set device log level - to see logs from the Script node
device.setLogLevel(dai.LogLevel.WARN)
device.setLogOutputLevel(dai.LogLevel.WARN)
logger.info("Starting pipeline ...")
cam_out = device.getOutputQueue("cam_out")
cam_select = device.getInputQueue("cam_select")
cam_select_buffer = dai.Buffer()
cam_list = {
0: "left",
1: "right",
2: "rgb",
}
cam_select_id = 0
while True:
cam_select_buffer.setData([cam_select_id])
cam_select.send(cam_select_buffer)
has_results = wait_for_results(cam_out)
if not has_results:
logger.warning(f"cam {cam_list.get(cam_select_id)} No data from nn!")
cam_select_id = (cam_select_id + 1) % 3
logger.warning(f"change to {cam_list.get(cam_select_id)}")
continue
frame = cam_out.tryGet()
cv2.imshow("frame", frame.getCvFrame())
key = cv2.waitKey(1)
if key in range(48, 51):
cam_select_id = key - 48
logger.info(f"change to {cam_list.get(cam_select_id)}")
elif key == 32:
cam_select_id = (cam_select_id + 1) % 3
logger.info(f"change to {cam_list.get(cam_select_id)}")
elif key == ord("q"):
break
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment