Skip to content

Instantly share code, notes, and snippets.

@Erol444
Created April 15, 2021 14:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Erol444/e83aad8ae2ef439077b7d79397e0bb99 to your computer and use it in GitHub Desktop.
Save Erol444/e83aad8ae2ef439077b7d79397e0bb99 to your computer and use it in GitHub Desktop.
DepthAI gen2 RGBD pointcloud
#!/usr/bin/env python3
import cv2
import numpy as np
import depthai as dai
from time import sleep
import datetime
import argparse
'''
If one or more of the additional depth modes (lrcheck, extended, subpixel)
are enabled, then:
- depth output is FP16. TODO enable U16.
- median filtering is disabled on device. TODO enable.
- with subpixel, either depth or disparity has valid data.
Otherwise, depth output is U16 (mm) and median is functional.
But like on Gen1, either depth or disparity has valid data. TODO enable both.
'''
parser = argparse.ArgumentParser()
parser.add_argument("-pcl", "--pointcloud", help="enables point cloud convertion and visualization", default=False, action="store_true")
parser.add_argument("-static", "--static_frames", default=False, action="store_true",
help="Run stereo on static frames passed from host 'dataset' folder")
args = parser.parse_args()
point_cloud = args.pointcloud # Create point cloud visualizer. Depends on 'out_rectified'
# StereoDepth config options. TODO move to command line options
source_camera = not args.static_frames
out_depth = False # Disparity by default
out_rectified = True # Output and display rectified streams
lrcheck = False # Better handling for occlusions
extended = True # Closer-in minimum depth, disparity range is doubled
subpixel = False # Better accuracy for longer distance, fractional disparity 32-levels
# Options: MEDIAN_OFF, KERNEL_3x3, KERNEL_5x5, KERNEL_7x7
median = dai.StereoDepthProperties.MedianFilter.KERNEL_7x7
# Sanitize some incompatible options
if lrcheck or extended or subpixel:
median = dai.StereoDepthProperties.MedianFilter.MEDIAN_OFF # TODO
print("StereoDepth config options:")
print(" Left-Right check: ", lrcheck)
print(" Extended disparity:", extended)
print(" Subpixel: ", subpixel)
print(" Median filtering: ", median)
# TODO add API to read this from device / calib data
right_intrinsic = [[860.0, 0.0, 640.0], [0.0, 860.0, 360.0], [0.0, 0.0, 1.0]]
pcl_converter = None
if point_cloud:
if out_rectified:
try:
from projector_3d import PointCloudVisualizer
except ImportError as e:
raise ImportError(f"\033[1;5;31mError occured when importing PCL projector: {e}. Try disabling the point cloud \033[0m ")
pcl_converter = PointCloudVisualizer(right_intrinsic, 1280, 720)
else:
print("Disabling point-cloud visualizer, as out_rectified is not set")
def create_stereo_depth_pipeline():
print("Creating Stereo Depth pipeline: ", end='')
print("MONO CAMS -> STEREO -> XLINK OUT")
pipeline = dai.Pipeline()
cam_left = pipeline.createMonoCamera()
cam_right = pipeline.createMonoCamera()
stereo = pipeline.createStereoDepth()
# xout_depth = pipeline.createXLinkOut()
cam_left .setBoardSocket(dai.CameraBoardSocket.LEFT)
cam_right.setBoardSocket(dai.CameraBoardSocket.RIGHT)
for cam in [cam_left, cam_right]: # Common config
cam.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
stereo.setOutputDepth(out_depth)
stereo.setOutputRectified(out_rectified)
stereo.setConfidenceThreshold(200)
stereo.setRectifyEdgeFillColor(0) # Black, to better see the cutout
stereo.setMedianFilter(median) # KERNEL_7x7 default
stereo.setLeftRightCheck(lrcheck)
stereo.setExtendedDisparity(extended)
stereo.setSubpixel(subpixel)
cam_left.out.link(stereo.left)
cam_right.out.link(stereo.right)
# if from_camera:
# # Default: EEPROM calib is used, and resolution taken from MonoCamera nodes
# #stereo.loadCalibrationFile(path)
# pass
# else:
# stereo.setEmptyCalibration() # Set if the input frames are already rectified
# stereo.setInputResolution(1280, 720)
xout_disparity = pipeline.createXLinkOut()
xout_disparity.setStreamName('disparity')
stereo.disparity.link(xout_disparity.input)
cam=pipeline.createColorCamera()
cam.setPreviewSize(1280, 720)
cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam.setInterleaved(False)
cam.setBoardSocket(dai.CameraBoardSocket.RGB)
xout_preview = pipeline.createXLinkOut()
xout_preview.setStreamName('rgb_preview')
cam.preview.link(xout_preview.input)
streams = ['rgb_preview', 'disparity']
return pipeline, streams
rgb_frame = None
# The operations done here seem very CPU-intensive, TODO
def convert_to_cv2_frame(name, image):
global rgb_frame
baseline = 75 #mm
focal = right_intrinsic[0][0]
max_disp = 96
disp_type = np.uint8
disp_levels = 1
if (extended):
max_disp *= 2
if (subpixel):
max_disp *= 32;
disp_type = np.uint16 # 5 bits fractional disparity
disp_levels = 32
data, w, h = image.getData(), image.getWidth(), image.getHeight()
# TODO check image frame type instead of name
if name == 'rgb_preview':
frame = image.getCvFrame()
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
elif name == 'disparity':
disp = np.array(data).astype(np.uint8).view(disp_type).reshape((h, w))
# Compute depth from disparity (32 levels)
with np.errstate(divide='ignore'): # Should be safe to ignore div by zero here
depth = (disp_levels * baseline * focal / disp).astype(np.uint16)
if 1: # Optionally, extend disparity range to better visualize it
frame = (disp * 255. / max_disp).astype(np.uint8)
if 1: # Optionally, apply a color maps
frame = cv2.applyColorMap(frame, cv2.COLORMAP_HOT)
#frame = cv2.applyColorMap(frame, cv2.COLORMAP_JET)
if pcl_converter is not None:
if rgb_frame is not None: # Option 1: project colorized disparity
pcl_converter.rgbd_to_projection(depth, rgb_frame, True)
pcl_converter.visualize_pcd()
return frame
def test_pipeline():
pipeline, streams = create_stereo_depth_pipeline()
print("Creating DepthAI device")
with dai.Device(pipeline) as device:
print("Starting pipeline")
device.startPipeline()
# Create a receive queue for each stream
q_list = []
for s in streams:
q = device.getOutputQueue(s, 8, blocking=False)
q_list.append(q)
while True:
# Handle output streams
for q in q_list:
name = q.getName()
image = q.tryGet()
#print("Received frame:", name)
# Skip some streams for now, to reduce CPU load
if name in ['left', 'right', 'depth']: continue
if image is not None:
frame = convert_to_cv2_frame(name, image)
cv2.imshow(name, frame)
if cv2.waitKey(1) == ord('q'):
break
test_pipeline()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment