-
-
Save Erol444/e83aad8ae2ef439077b7d79397e0bb99 to your computer and use it in GitHub Desktop.
DepthAI gen2 RGBD pointcloud
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import cv2 | |
import numpy as np | |
import depthai as dai | |
from time import sleep | |
import datetime | |
import argparse | |
''' | |
If one or more of the additional depth modes (lrcheck, extended, subpixel) | |
are enabled, then: | |
- depth output is FP16. TODO enable U16. | |
- median filtering is disabled on device. TODO enable. | |
- with subpixel, either depth or disparity has valid data. | |
Otherwise, depth output is U16 (mm) and median is functional. | |
But like on Gen1, either depth or disparity has valid data. TODO enable both. | |
''' | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-pcl", "--pointcloud", help="enables point cloud convertion and visualization", default=False, action="store_true") | |
parser.add_argument("-static", "--static_frames", default=False, action="store_true", | |
help="Run stereo on static frames passed from host 'dataset' folder") | |
args = parser.parse_args() | |
point_cloud = args.pointcloud # Create point cloud visualizer. Depends on 'out_rectified' | |
# StereoDepth config options. TODO move to command line options | |
source_camera = not args.static_frames | |
out_depth = False # Disparity by default | |
out_rectified = True # Output and display rectified streams | |
lrcheck = False # Better handling for occlusions | |
extended = True # Closer-in minimum depth, disparity range is doubled | |
subpixel = False # Better accuracy for longer distance, fractional disparity 32-levels | |
# Options: MEDIAN_OFF, KERNEL_3x3, KERNEL_5x5, KERNEL_7x7 | |
median = dai.StereoDepthProperties.MedianFilter.KERNEL_7x7 | |
# Sanitize some incompatible options | |
if lrcheck or extended or subpixel: | |
median = dai.StereoDepthProperties.MedianFilter.MEDIAN_OFF # TODO | |
print("StereoDepth config options:") | |
print(" Left-Right check: ", lrcheck) | |
print(" Extended disparity:", extended) | |
print(" Subpixel: ", subpixel) | |
print(" Median filtering: ", median) | |
# TODO add API to read this from device / calib data | |
right_intrinsic = [[860.0, 0.0, 640.0], [0.0, 860.0, 360.0], [0.0, 0.0, 1.0]] | |
pcl_converter = None | |
if point_cloud: | |
if out_rectified: | |
try: | |
from projector_3d import PointCloudVisualizer | |
except ImportError as e: | |
raise ImportError(f"\033[1;5;31mError occured when importing PCL projector: {e}. Try disabling the point cloud \033[0m ") | |
pcl_converter = PointCloudVisualizer(right_intrinsic, 1280, 720) | |
else: | |
print("Disabling point-cloud visualizer, as out_rectified is not set") | |
def create_stereo_depth_pipeline(): | |
print("Creating Stereo Depth pipeline: ", end='') | |
print("MONO CAMS -> STEREO -> XLINK OUT") | |
pipeline = dai.Pipeline() | |
cam_left = pipeline.createMonoCamera() | |
cam_right = pipeline.createMonoCamera() | |
stereo = pipeline.createStereoDepth() | |
# xout_depth = pipeline.createXLinkOut() | |
cam_left .setBoardSocket(dai.CameraBoardSocket.LEFT) | |
cam_right.setBoardSocket(dai.CameraBoardSocket.RIGHT) | |
for cam in [cam_left, cam_right]: # Common config | |
cam.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P) | |
stereo.setOutputDepth(out_depth) | |
stereo.setOutputRectified(out_rectified) | |
stereo.setConfidenceThreshold(200) | |
stereo.setRectifyEdgeFillColor(0) # Black, to better see the cutout | |
stereo.setMedianFilter(median) # KERNEL_7x7 default | |
stereo.setLeftRightCheck(lrcheck) | |
stereo.setExtendedDisparity(extended) | |
stereo.setSubpixel(subpixel) | |
cam_left.out.link(stereo.left) | |
cam_right.out.link(stereo.right) | |
# if from_camera: | |
# # Default: EEPROM calib is used, and resolution taken from MonoCamera nodes | |
# #stereo.loadCalibrationFile(path) | |
# pass | |
# else: | |
# stereo.setEmptyCalibration() # Set if the input frames are already rectified | |
# stereo.setInputResolution(1280, 720) | |
xout_disparity = pipeline.createXLinkOut() | |
xout_disparity.setStreamName('disparity') | |
stereo.disparity.link(xout_disparity.input) | |
cam=pipeline.createColorCamera() | |
cam.setPreviewSize(1280, 720) | |
cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P) | |
cam.setInterleaved(False) | |
cam.setBoardSocket(dai.CameraBoardSocket.RGB) | |
xout_preview = pipeline.createXLinkOut() | |
xout_preview.setStreamName('rgb_preview') | |
cam.preview.link(xout_preview.input) | |
streams = ['rgb_preview', 'disparity'] | |
return pipeline, streams | |
rgb_frame = None | |
# The operations done here seem very CPU-intensive, TODO | |
def convert_to_cv2_frame(name, image): | |
global rgb_frame | |
baseline = 75 #mm | |
focal = right_intrinsic[0][0] | |
max_disp = 96 | |
disp_type = np.uint8 | |
disp_levels = 1 | |
if (extended): | |
max_disp *= 2 | |
if (subpixel): | |
max_disp *= 32; | |
disp_type = np.uint16 # 5 bits fractional disparity | |
disp_levels = 32 | |
data, w, h = image.getData(), image.getWidth(), image.getHeight() | |
# TODO check image frame type instead of name | |
if name == 'rgb_preview': | |
frame = image.getCvFrame() | |
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
elif name == 'disparity': | |
disp = np.array(data).astype(np.uint8).view(disp_type).reshape((h, w)) | |
# Compute depth from disparity (32 levels) | |
with np.errstate(divide='ignore'): # Should be safe to ignore div by zero here | |
depth = (disp_levels * baseline * focal / disp).astype(np.uint16) | |
if 1: # Optionally, extend disparity range to better visualize it | |
frame = (disp * 255. / max_disp).astype(np.uint8) | |
if 1: # Optionally, apply a color maps | |
frame = cv2.applyColorMap(frame, cv2.COLORMAP_HOT) | |
#frame = cv2.applyColorMap(frame, cv2.COLORMAP_JET) | |
if pcl_converter is not None: | |
if rgb_frame is not None: # Option 1: project colorized disparity | |
pcl_converter.rgbd_to_projection(depth, rgb_frame, True) | |
pcl_converter.visualize_pcd() | |
return frame | |
def test_pipeline(): | |
pipeline, streams = create_stereo_depth_pipeline() | |
print("Creating DepthAI device") | |
with dai.Device(pipeline) as device: | |
print("Starting pipeline") | |
device.startPipeline() | |
# Create a receive queue for each stream | |
q_list = [] | |
for s in streams: | |
q = device.getOutputQueue(s, 8, blocking=False) | |
q_list.append(q) | |
while True: | |
# Handle output streams | |
for q in q_list: | |
name = q.getName() | |
image = q.tryGet() | |
#print("Received frame:", name) | |
# Skip some streams for now, to reduce CPU load | |
if name in ['left', 'right', 'depth']: continue | |
if image is not None: | |
frame = convert_to_cv2_frame(name, image) | |
cv2.imshow(name, frame) | |
if cv2.waitKey(1) == ord('q'): | |
break | |
test_pipeline() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment