Skip to content

Instantly share code, notes, and snippets.

@davidliyutong
Created January 19, 2024 14:00
Show Gist options
  • Save davidliyutong/24739f220122eed1eea9aeb3d69ca7bf to your computer and use it in GitHub Desktop.
Save davidliyutong/24739f220122eed1eea9aeb3d69ca7bf to your computer and use it in GitHub Desktop.
Capture PointCloud with Realsense Cameras
import logging
from typing import List, Optional, Tuple
import numpy as np
import pyrealsense2 as rs
from pydantic import BaseModel
import open3d as o3d
import cv2
import py_cli_interaction
import datetime
import os
# Color Config, must match Intel Realsense Viewer
class RealsenseCameraColorCfg(BaseModel):
width: int = 1920
height: int = 1080
format: str = 'rs.format.bgr8'
fps: int = 30
exposure: int = 0
use_auto_exposure: bool = True
# Depth Config, must match Intel Realsense Viewer
class RealsenseCameraDepthCfg(BaseModel):
width: int = 1024
height: int = 768
format: str = 'rs.format.z16'
fps: int = 30
preset: str = 'Short Range'
# Bundled Config
class RealsenseCameraCfg(BaseModel):
sn: str
depth: RealsenseCameraDepthCfg = RealsenseCameraDepthCfg()
color: RealsenseCameraColorCfg = RealsenseCameraColorCfg()
def get_device_by_cfg(cfg: RealsenseCameraCfg) -> Optional[rs.device]:
"""Input configuration, output device handle
Args:
cfg (RealsenseCameraCfg): configuration
Returns:
Optional[rs.device]: device handle
"""
context = rs.context()
for d in context.devices:
if d.get_info(rs.camera_info.name).lower() != 'platform camera':
sn = d.get_info(rs.camera_info.serial_number)
if sn == cfg.sn:
return d
return None
class RealsenseCameraModel:
rs_config: Optional[rs.config] = None
pipline: Optional[rs.pipeline] = None
device: Optional[rs.device] = None
intrinsics_matrix: Optional[List[List[float]]] = None
align: Optional[rs.align] = None
def __init__(self,
camera_cfg: RealsenseCameraCfg,
):
self.option = camera_cfg
def open(self):
self.rs_config = rs.config()
# configure config
self.rs_config.enable_stream(rs.stream.depth,
self.option.depth.width,
self.option.depth.height,
eval(self.option.depth.format),
self.option.depth.fps)
self.rs_config.enable_stream(rs.stream.color,
self.option.color.width,
self.option.color.height,
eval(self.option.color.format),
self.option.color.fps)
self.rs_config.enable_device(self.option.sn)
# get device
self.device = get_device_by_cfg(self.option)
assert self.device is not None, f"Device {self.option.sn} not found"
# configure depth preset
depth_sensor = self.device.first_depth_sensor()
preset_range = depth_sensor.get_option_range(rs.option.visual_preset)
for i in range(int(preset_range.max) + 1):
visual_preset = depth_sensor.get_option_value_description(rs.option.visual_preset, i)
if visual_preset == self.option.depth.preset:
depth_sensor.set_option(rs.option.visual_preset, i)
# configure RGB exposure
sensor = self.device.query_sensors()[1]
if self.option.color.use_auto_exposure:
sensor.set_option(rs.option.enable_auto_exposure, 1)
else:
sensor.set_option(rs.option.exposure, self.option.color.exposure)
# configure alignment (depth -> color)
self.align = rs.align(rs.stream.color)
# self.align = rs.align(rs.stream.depth)
def start(self):
# create pipeline
self.pipeline = rs.pipeline()
profile = self.pipeline.start(self.rs_config)
# set intrinsics
intrinsics = profile.get_stream(rs.stream.color).as_video_stream_profile().get_intrinsics()
self.intrinsics_matrix = [
[intrinsics.fx, 0., intrinsics.ppx],
[0., intrinsics.fy, intrinsics.ppy],
[0., 0., 1.]
]
logging.debug(f"camera {self.option.sn} intrinsics: {self.intrinsics_matrix}")
def get_frames(self, timeout_ms=5000) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
ret, frames = self.pipeline.try_wait_for_frames(timeout_ms=timeout_ms)
if not ret:
return None, None
else:
pass
try:
aligned_frames = self.align.process(frames)
except RuntimeError as e:
return None, None
depth_frame = aligned_frames.get_depth_frame()
color_frame = aligned_frames.get_color_frame()
if depth_frame:
depth_image = np.asanyarray(depth_frame.get_data())
else:
depth_image = None
color_image = np.asanyarray(color_frame.get_data()) if color_frame else None
return color_image, depth_image
def close(self):
logging.debug(f"closing camera {self.option.sn}")
if self.pipeline is not None:
try:
self.pipeline.stop()
except RuntimeError as e:
pass
if self.device is not None:
self.device.hardware_reset()
@staticmethod
def rgbd2pc(
color_raw,
depth_raw,
camera_intrinsic,
depth_scale=1000.,
depth_trunc=10.0,
):
if isinstance(color_raw, np.ndarray):
color_raw = o3d.geometry.Image(cv2.cvtColor(color_raw.astype(np.uint8), cv2.COLOR_BGRA2RGB)) # Convert to RGB
if isinstance(depth_raw, np.ndarray):
depth_raw = o3d.geometry.Image(depth_raw.astype(np.uint16))
if isinstance(camera_intrinsic, tuple):
width, height, intrinsic_matrix = camera_intrinsic
camera_intrinsic = o3d.camera.PinholeCameraIntrinsic(int(width), int(height), intrinsic_matrix[0][0], intrinsic_matrix[1][1], intrinsic_matrix[0][2], intrinsic_matrix[1][2])
rgbd_image = o3d.geometry.RGBDImage().create_from_color_and_depth(
color_raw,
depth_raw,
depth_scale,
depth_trunc,
convert_rgb_to_intensity=False
)
pcd = o3d.geometry.PointCloud().create_from_rgbd_image(
image=rgbd_image,
intrinsic=camera_intrinsic,
extrinsic=np.eye(4),
)
return pcd
def visualize_pcd(pcd: o3d.geometry.PointCloud):
coordinate = o3d.geometry.TriangleMesh().create_coordinate_frame(size=0.1, origin=[0, 0, 0])
o3d.visualization.draw_geometries([coordinate, pcd])
def main(args):
# prepare folder
if not os.path.exists('pcd'):
os.makedirs('pcd', exist_ok=True)
# prepare camera
if args.sn != '':
cam = RealsenseCameraModel(RealsenseCameraCfg(sn=args.sn))
else:
cam = RealsenseCameraModel(RealsenseCameraCfg(sn=py_cli_interaction.must_parse_cli_string('Serial Number:')))
cam.open()
cam.start()
try:
while True:
if py_cli_interaction.must_parse_cli_bool('Capture ?', default_value=True):
pass
else:
continue
color, depth = cam.get_frames()
pc = cam.rgbd2pc(
color,
depth,
(cam.option.color.width, cam.option.color.height, cam.intrinsics_matrix),
depth_scale=4000.,
)
visualize_pcd(pc)
if py_cli_interaction.must_parse_cli_bool('Save ?', default_value=True):
o3d.io.write_point_cloud(os.path.join('pcd', f'{datetime.datetime.now().strftime("%Y%m%d-%H%M%S")}.ply'), pc)
print("Saved")
except KeyboardInterrupt as e:
cam.close()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--sn', type=str, default='')
main(parser.parse_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment