Skip to content

Instantly share code, notes, and snippets.

@kosuke55
Created March 22, 2024 06:23
Show Gist options
  • Save kosuke55/1ef986d8d61c43e2eb6f0d9e198dd7a4 to your computer and use it in GitHub Desktop.
Save kosuke55/1ef986d8d61c43e2eb6f0d9e198dd7a4 to your computer and use it in GitHub Desktop.
import rclpy
from rclpy.node import Node
from autoware_auto_perception_msgs.msg import (
DetectedObjects,
TrackedObjects,
TrackedObject,
)
from unique_identifier_msgs.msg import UUID
from geometry_msgs.msg import (
PoseWithCovariance,
TwistWithCovariance,
AccelWithCovariance,
)
import tf_transformations
import numpy as np
class DetectedToTrackedNode(Node):
def __init__(self):
super().__init__("detected_to_tracked_node")
self.subscription = self.create_subscription(
DetectedObjects,
"/perception/object_recognition/detection/objects",
self.detected_objects_callback,
10,
)
self.publisher = self.create_publisher(
TrackedObjects, "/perception/object_recognition/tracking/objects", 10
)
self.static_uuid = UUID(
uuid=[123, 456, 789, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
)
self.yaw_rate = 0.2
self.yaw = 0.0
self.prev_stamp = None
def detected_objects_callback(self, msg):
tracked_objects_msg = TrackedObjects()
tracked_objects_msg.header = msg.header
if self.prev_stamp is None:
self.prev_stamp = msg.header.stamp
return
for detected_object in msg.objects:
tracked_object = TrackedObject()
tracked_object.object_id = self.static_uuid
tracked_object.existence_probability = detected_object.existence_probability
tracked_object.classification = detected_object.classification
quaternion = (
detected_object.kinematics.pose_with_covariance.pose.orientation.x,
detected_object.kinematics.pose_with_covariance.pose.orientation.y,
detected_object.kinematics.pose_with_covariance.pose.orientation.z,
detected_object.kinematics.pose_with_covariance.pose.orientation.w,
)
euler = tf_transformations.euler_from_quaternion(quaternion)
current_stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
prev_stamp_sec = self.prev_stamp.sec + self.prev_stamp.nanosec * 1e-9
elapsed_time = current_stamp_sec - prev_stamp_sec
self.yaw += self.yaw_rate * elapsed_time
self.yaw = (self.yaw + np.pi) % (2 * np.pi) - np.pi
euler = (euler[0], euler[1], self.yaw)
new_quaternion = tf_transformations.quaternion_from_euler(*euler)
tracked_object.kinematics.pose_with_covariance = (
detected_object.kinematics.pose_with_covariance
)
tracked_object.kinematics.orientation_availability = (
detected_object.kinematics.orientation_availability
)
tracked_object.kinematics.twist_with_covariance = (
detected_object.kinematics.twist_with_covariance
)
tracked_object.kinematics.pose_with_covariance.pose.orientation.x = (
new_quaternion[0]
)
tracked_object.kinematics.pose_with_covariance.pose.orientation.y = (
new_quaternion[1]
)
tracked_object.kinematics.pose_with_covariance.pose.orientation.z = (
new_quaternion[2]
)
tracked_object.kinematics.pose_with_covariance.pose.orientation.w = (
new_quaternion[3]
)
tracked_object.shape = detected_object.shape
tracked_objects_msg.objects.append(tracked_object)
self.publisher.publish(tracked_objects_msg)
self.prev_stamp = msg.header.stamp
def main(args=None):
rclpy.init(args=args)
node = DetectedToTrackedNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment