Skip to content

Instantly share code, notes, and snippets.

@horverno
Created May 19, 2021 08:22
Show Gist options
  • Save horverno/b8d72f996b84c0925f019f0c770a2eae to your computer and use it in GitHub Desktop.
Save horverno/b8d72f996b84c0925f019f0c770a2eae to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import sys
import os
import struct
import numpy as np
import rospy
from std_msgs.msg import Header
from sensor_msgs.msg import Image, CompressedImage, CameraInfo, PointCloud2, PointField
from image_geometry import PinholeCameraModel
import message_filters
from timeit import default_timer as timer
import tf # geometry2 transforms
from cv_bridge import CvBridge, CvBridgeError
import math
import cv2
class CameraLidarFusionLanesROSNode():
"""
Advertises a topic on ground pixels with depth information (x, y, z).
Args:
point_freq (int): Used lidar point frequency (1 is most dense).
Recommended values are 3 and 5.
"""
def __init__(self, point_freq=1):
self.CvBridge = CvBridge()
self.Camera_subscriber = message_filters.Subscriber(
#"/zed_node/left/image_rect_color/compressed", CompressedImage ###### ez ######
"/zed_node/right_raw/image_raw_color", Image ###### ez ######
)
self.Lidar_subscriber = message_filters.Subscriber(
"/right_os1/os1_cloud_node/points", PointCloud2
)
self.Synchronizer = message_filters.ApproximateTimeSynchronizer(
fs=[self.Camera_subscriber, self.Lidar_subscriber],
queue_size=1, # set of messages stored per 'message_filter'
slop=0.2, # range of delay for synchronization
allow_headerless=True
)
self.Synchronizer.registerCallback(self.fusion_callback)
self.CameraModel = PinholeCameraModel()
# translation
self.Trans = [0.6, -0.10, 0.0]
# rotation
self.Rot = [1.57, -1.57, 0.0] ###### ez ######
#self.Rot = [1.57, 1.57, 0.0] ###### ez ######
self.RotationMatrix = None
self.point_freq = point_freq
self.lidar_data = np.ones((4, 32768))[:, ::self.point_freq]
self.Points = None
self.PointsToCloud = None
# call camera calibration
self.calibrate_camera()
def calculate_rotation_matrix(self):
"""
Calculates rotation matrix from translation and rotation.
"""
self.Trans = tuple(self.Trans) + (1, )
self.RotationMatrix = np.asarray(
tf.transformations.euler_matrix(
self.Rot[0], self.Rot[1], self.Rot[2]
)
)
self.RotationMatrix[:, 3] = np.asarray(self.Trans)
def calibrate_camera(self):
"""
Calibrate camera and call rotation matrix calculation.
"""
camera_info = rospy.wait_for_message("/zed_node/left/camera_info", CameraInfo)
self.CameraModel.fromCameraInfo(camera_info)
print('[INFO] Calibration completed!')
self.calculate_rotation_matrix()
print('[INFO] Rotation Matrix calculated!')
def fusion_callback(self, camera_ros_msg, lidar_ros_msg):
"""
Main callback used in ApproximateTimeSynchronizer() to
fuse camera ground points (x, y) with depth information (z).
Args:
camera_ros_msg: Raw camera message from a message_filter.
pred_ros_msg: Segmentation message from a message_filter.
lidar_ros_msg: Raw lidar message from a message_filter.
"""
start_time = timer()
try:
np_arr = np.fromstring(camera_ros_msg.data, np.uint8) ###### ez ######
image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) ###### ez ######
#image = self.CvBridge.imgmsg_to_cv2(camera_ros_msg, "bgr8") ###### ez ######
except CvBridgeError as e:
print(e)
#self.Points = []
self.Points = [struct.unpack("<fff", lidar_ros_msg.data[idx:idx + 12]) for idx in range(0, len(lidar_ros_msg.data), 48 * self.point_freq)]
#for idx in range(0, len(lidar_ros_msg.data), 48 * self.point_freq):
# self.Points.append(struct.unpack("<fff", lidar_ros_msg.data[idx:idx + 12]))
self.Points = np.swapaxes(np.array(self.Points), 0, 1)
size = len(self.Points[0])
self.lidar_data[:-1, :] = self.Points
newlidardata = []
# dot exception occours sometimes that needs to be caught
# TODO: nem az osszeset transzformalni, kiszurni ami biztos nem esik a szegmentacio teruletere
# LINE PROFILER
try:
rotatedPointList = self.RotationMatrix.dot(self.lidar_data)
except AttributeError as e:
print(e)
return
x = rotatedPointList[0, :]
y = rotatedPointList[1, :]
z = rotatedPointList[2, :]
u = (self.CameraModel.fx() * x) / z + self.CameraModel.cx()
v = (self.CameraModel.fy() * y) / z + self.CameraModel.cy()
self.PointsToCloud = []
#self.PointsToCloud = [[self.lidar_data[0][i], self.lidar_data[1][i], self.lidar_data[2][i], image[int(v[i]), int(u[i]), 2] / 255.0, image[int(v[i]), int(u[i]), 1] / 255.0, image[int(v[i]), int(u[i]), 0] / 255.0, 0] for i in range(0, size) if not(math.isinf(v[i])) and not(math.isinf(u[i])) and self.lidar_data[0][i] <= 0.0 and int(v[i]) >= 0 and int(v[i]) < image.shape[0] and int(u[i]) >= 0 and int(u[i]) < image.shape[1]]
for i in range(0, size, 1):
try:
if (
###### ez ######
#self.lidar_data[0][i] >= 0.0 and \
###### ez ######
self.lidar_data[0][i] <= 0.0 and \
int(v[i]) >= 0 and int(v[i]) < image.shape[0] and \
int(u[i]) >= 0 and int(u[i]) < image.shape[1]
):
(b, g, r) = image[int(v[i]), int(u[i]), :]
self.PointsToCloud.append(
[self.lidar_data[0][i], self.lidar_data[1][i], self.lidar_data[2][i],
r / 255.0, g / 255.0, b / 255.0, 0
]
)
except:
pass
#dim 0 components --> main elements, dim 1 components --> (x, y, z, r, g, b, a)
self.PointsToCloud = np.array(self.PointsToCloud)
# ROS message attributes for publishing
ros_dtype = PointField.FLOAT32
dtype = np.float32
itemsize = np.dtype(dtype).itemsize
pcdata = self.PointsToCloud.astype(dtype).tobytes()
# create fields with list comprehension
fields = [
PointField(name=n,
offset=i * itemsize,
datatype=ros_dtype,
count=1
) for i, n in enumerate('xyzrgba')
]
header = Header(frame_id="right_os1/os1_lidar", stamp=rospy.Time.now())
# Publish as ROS topic
lane_xyz_data = PointCloud2(
header=header,
height=1,
width=self.PointsToCloud.shape[0],
is_dense=False,
is_bigendian=False,
fields=fields,
point_step=(itemsize * 7),
row_step=(itemsize * 7 * self.PointsToCloud.shape[0]),
data=pcdata
)
publisher = rospy.Publisher('camera_stream/lidar_fusion/lanes', PointCloud2, queue_size=1)
publisher.publish(lane_xyz_data)
end_time = timer()
delta = end_time - start_time
print("[INFO] Callback Time: {} s".format(delta))
print("[INFO] FPS: {} Hz".format(1 / delta))
def main(self):
rospy.spin()
if __name__ == '__main__':
rospy.init_node('fusion_all_py', anonymous=True)
sensor_fusion_ros = CameraLidarFusionLanesROSNode()
sensor_fusion_ros.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment