Skip to content

Instantly share code, notes, and snippets.

@Merwanski
Last active July 20, 2024 13:04
Show Gist options
  • Save Merwanski/bef218ff3b7472d5ac787e1392116fd7 to your computer and use it in GitHub Desktop.
Save Merwanski/bef218ff3b7472d5ac787e1392116fd7 to your computer and use it in GitHub Desktop.
How to extract point cloud data from ros1 bag file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Extract a folder of ply from a rosbag.
python3 bag_to_ply.py FILE_NAME.bag FOLDER_NAME TOPIC_NAME
e.g.,
python3 bag_to_ply.py FILE_NAME.bag FOLDER_NAME /point_cloud/cloud_registered
"""
import os
import argparse
import open3d as o3d
import numpy as np
import rosbag
from sensor_msgs.msg import PointCloud2
def convert_pointcloud2_to_numpy(point_cloud_msg):
dtype_list = []
for field in point_cloud_msg.fields:
if field.name in ['x', 'y', 'z', 'rgb']:
dtype_list.append((field.name, np.float32))
dtype = np.dtype(dtype_list)
data = np.frombuffer(point_cloud_msg.data, dtype=dtype)
points = np.zeros((data.shape[0], 6))
points[:, :3] = np.vstack([data['x'], data['y'], data['z']]).T
rgb = data['rgb'].view(np.uint32)
points[:, 3] = ((rgb >> 16) & 0x0000ff) / 255.0
points[:, 4] = ((rgb >> 8) & 0x0000ff) / 255.0
points[:, 5] = (rgb & 0x0000ff) / 255.0
return points
def main():
parser = argparse.ArgumentParser(description="Extract images from a ROS bag.")
parser.add_argument("bag_file", help="Input ROS bag.")
parser.add_argument("output_dir", help="Output directory.")
parser.add_argument("pointcloud_topic", help="pointcloud topic.")
args = parser.parse_args()
print("Extract images from %s on topic %s into %s" % (args.bag_file,
args.pointcloud_topic,
args.output_dir))
bag = rosbag.Bag(args.bag_file, "r")
count = 0
for topic, msg, t in bag.read_messages(topics=[args.pointcloud_topic]):
raw_3d_data = msg
# format(msg.header.stamp.nsecs*1e-9, '.9f')
timestampeValue = format(msg.header.stamp.secs+msg.header.stamp.nsecs*1e-9, '.9f')
print("[INFO] 3d data received .... height x width = {} x {} pixels".format(raw_3d_data.height, raw_3d_data.width))
point_cloud_np = convert_pointcloud2_to_numpy(raw_3d_data)
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(point_cloud_np)
o3d.io.write_point_cloud(os.path.join(args.output_dir, timestampeValue+"_frame%06i.pcd" % count), pcd)
# DEBUG ONLY - Visualize the first point cloud data
if count == 0:
o3d.visualization.draw_geometries([pcd])
count += 1
bag.close()
return
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment