Created January 15, 2024 20:58
import os
import cv2
import open3d as o3d
import numpy as np
from sensor_msgs.msg import PointCloud2
from sensor_msgs.msg import CompressedImage
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
import pcl
import time
import ros2_numpy as rnp
class DataExtractor(Node):
def __init__(self):
script_directory = os.path.dirname(os.path.abspath(__file__))
self.image_directory = os.path.join(script_directory, 'dataExtracted', 'leftImages')
self.point_cloud_directory = os.path.join(script_directory, 'dataExtracted', 'pointCloud')
# Performance Monitoring
self.last_save_time = time.time()
self.last_image_save_time = time.time()
self.save_warning_threshold = 0.5 # seconds
self.image_save_warning_threshold = 0.5 # seconds
self.get_logger().info('Data Extractor is ready.')
def image_callback(self, msg):
# Extract timestamp from the message
timestamp = msg.header.stamp
timestamp_ms = timestamp.sec * 1000 + timestamp.nanosec / 1e6
# Create the directory if it doesn't exist
os.makedirs(self.image_directory, exist_ok=True)
# Construct the file path
image_filename = os.path.join(self.image_directory, f'{timestamp_ms:.4f}ms.png')
# Skip saving if the file already exists
if os.path.exists(image_filename):
image_data = np.frombuffer(, np.uint8)
image_np = cv2.imdecode(image_data, cv2.IMREAD_UNCHANGED)
start_time = time.time()
cv2.imwrite(image_filename, image_np)
end_time = time.time()
# Print the time taken to save
save_duration = end_time - start_time
self.get_logger().info(f'Saved image: {image_filename}, Time taken: {save_duration:.4f} seconds')
# Performance Monitoring
current_time = time.time()
frame_duration = current_time - self.last_image_save_time
self.last_image_save_time = current_time
# Issue a warning if saving can't keep up with the frame rate
if frame_duration > self.image_save_warning_threshold:
self.get_logger().warning(f'Saving may not keep up with frame rate. Frame duration: {frame_duration:.4f} seconds')
except Exception as e:
self.get_logger().error(f'Error processing image: {str(e)}')
def point_cloud_callback(self, msg):
# Extract timestamp from the message
timestamp = msg.header.stamp
timestamp_ms = timestamp.sec * 1000 + timestamp.nanosec / 1e6
# get point cloud
data = rnp.numpify(msg)
dataXYZ = data['xyz']
# Convert PointCloud2 message to PCL PointCloud
# Pass xyz to Open3D.o3d.geometry.PointCloud and visualize
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(dataXYZ)
# Remove NaN values
point_cloud = pcd
cleaned_points = np.asarray(point_cloud.points)
cleaned_points = cleaned_points[~np.isnan(cleaned_points).any(axis=1)]
point_cloud.points = o3d.utility.Vector3dVector(cleaned_points)
# Visualize the cleaned point cloud
# o3d.visualization.draw_geometries([point_cloud])
# Create the directory if it doesn't exist
os.makedirs(self.point_cloud_directory, exist_ok=True)
# Save the PCL PointCloud as a PCD file
ply_filename = os.path.join(self.point_cloud_directory, f'{timestamp_ms:.4f}ms.ply')
# Skip saving if the file already exists
if os.path.exists(ply_filename):
start_time = time.time(), pcd)
end_time = time.time()
# Print the time taken to save
save_duration = end_time - start_time
self.get_logger().info(f'Saved point cloud: {ply_filename}, Time taken: {save_duration:.4f} seconds')
# Performance Monitoring
current_time = time.time()
frame_duration = current_time - self.last_save_time
self.last_save_time = current_time
# Issue a warning if saving can't keep up with the frame rate
if frame_duration > self.save_warning_threshold:
self.get_logger().warning(f'Saving may not keep up with frame rate. Frame duration: {frame_duration:.4f} seconds')
except Exception as e:
self.get_logger().error(f'Error processing point cloud: {str(e)}')
def main():
data_extractor = DataExtractor()
if __name__ == '__main__':
