Skip to content

Instantly share code, notes, and snippets.

@horverno
Created June 16, 2023 12:52
Show Gist options
  • Save horverno/5f2bab8449a8aadb583cabb8d71298a7 to your computer and use it in GitHub Desktop.
Save horverno/5f2bab8449a8aadb583cabb8d71298a7 to your computer and use it in GitHub Desktop.
# Reading from a MCAP file
# sudo apt install ros-$ROS_DISTRO-ros-base ros-$ROS_DISTRO-ros2bag ros-$ROS_DISTRO-rosbag2-transport ros-$ROS_DISTRO-rosbag2-storage-mcap
from mcap import __version__
print(__version__)
"""script that reads ROS2 messages from an MCAP bag using the rosbag2_py API."""
import argparse
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
import rosbag2_py
def read_messages(input_bag: str):
reader = rosbag2_py.SequentialReader()
reader.open(
rosbag2_py.StorageOptions(uri=input_bag, storage_id="mcap"),
rosbag2_py.ConverterOptions(
input_serialization_format="cdr", output_serialization_format="cdr"
),
)
topic_types = reader.get_all_topics_and_types()
def typename(topic_name):
for topic_type in topic_types:
if topic_type.name == topic_name:
return topic_type.type
raise ValueError(f"topic {topic_name} not in bag")
while reader.has_next():
topic, data, timestamp = reader.read_next()
msg_type = get_message(typename(topic))
msg = deserialize_message(data, msg_type)
yield topic, msg, timestamp
del reader
def main():
inpu = "/mnt/c/bag/gyor_ros2_lexus_gps/gyor_pelda_odom.mcap"
i = 0
first_run = True
for topic, msg, timestamp in read_messages(inpu):
if(first_run):
timestamp_start = timestamp
first_run = False
if (i % 1000 == 0):
if(topic == "/lexus3/vehicle_steering"):
print("steer: %.2f (%.4f)" % (msg.data, (timestamp-timestamp_start) / 1000000000))
if(topic == "/lexus3/gps/duro/imu"):
print("imu: %.2f %.2f (%.4f)" % (msg.angular_velocity.x, msg.angular_velocity.y, (timestamp-timestamp_start) / 1000000000))
#print("%s <%s>" % (topic, type(msg)))
i += 1
if __name__ == "__main__":
main()
@horverno
Copy link
Author

inpu = "/mnt/c/bag/zala_ros2_leaf_gps/nissan_uni_60_1856_dynamic_2023_07_11-11_08_0.mcap"
i = 0
first_run = True
plt.gca().set_aspect('equal', adjustable='box')
plt.title('Vehicle Odometry' + inpu)
plt.xlabel('X')
plt.ylabel('Y')
#plt.legend()
plt.grid(True)
wheelbase = 2.7
theta = 0.0
pos_x = 0.0
pos_y = 0.0
prev_steer_time = 0.0
prev_speed_time = 0.0
steer_time = 0.0
speed_time = 0.0
speed_data = 0.0
steer_data = 0.0
for topic, msg, timestamp in read_messages(inpu):
    if(first_run):
        timestamp_start = timestamp
        first_run = False
    if (i % 1 == 0):
        if(topic == "/nissan/vehicle_steering"):
            steer_data = msg.data
            steer_time = (timestamp-timestamp_start) / 1000000000
        if(topic == "/nissan/vehicle_speed"):
            speed_data = msg.data
            speed_time = (timestamp-timestamp_start) / 1000000000
        if(topic == "/nissan/gps/duro/current_pose"):
            plt.plot(msg.pose.position.x, msg.pose.position.y, 'b.')
        steer_duration = prev_steer_time - steer_time
        speed_duration = prev_speed_time - speed_time
        # Ackermann Bicycle model
        pos_x += speed_duration * speed_data * np.cos(theta)
        pos_y += speed_duration * speed_data * np.sin(theta)
        theta -= steer_duration * speed_data / wheelbase * np.tan(steer_data)
        prev_steer_time = steer_time
        prev_speed_time = speed_time
        plt.plot(pos_x, pos_y, 'r.')
    i += 1

plt.show()

@horverno
Copy link
Author

# Reading from a MCAP file
# sudo apt install ros-$ROS_DISTRO-ros-base ros-$ROS_DISTRO-ros2bag ros-$ROS_DISTRO-rosbag2-transport ros-$ROS_DISTRO-rosbag2-storage-mcap 
import matplotlib.pyplot as plt
import numpy as np
from mcap import __version__
print(__version__)


"""script that reads ROS2 messages from an MCAP bag using the rosbag2_py API."""
import argparse
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
import rosbag2_py


def read_messages(input_bag: str):
    reader = rosbag2_py.SequentialReader()
    reader.open(
        rosbag2_py.StorageOptions(uri=input_bag, storage_id="mcap"),
        rosbag2_py.ConverterOptions(
            input_serialization_format="cdr", output_serialization_format="cdr"
        ),
    )

    topic_types = reader.get_all_topics_and_types()

    def typename(topic_name):
        for topic_type in topic_types:
            if topic_type.name == topic_name:
                return topic_type.type
        raise ValueError(f"topic {topic_name} not in bag")

    while reader.has_next():
        topic, data, timestamp = reader.read_next()
        msg_type = get_message(typename(topic))
        msg = deserialize_message(data, msg_type)
        yield topic, msg, timestamp
    del reader

@horverno
Copy link
Author

inpu = "/mnt/c/bag/zala_ros2_leaf_gps/nissan_uni_60_1856_dynamic_2023_07_11-11_08_0.mcap"
i = 0
first_run = True
plt.gca().set_aspect('equal', adjustable='box')
plt.title('Vehicle Odometry' + inpu)
plt.xlabel('X')
plt.ylabel('Y')
#plt.legend()
plt.grid(True)
wheelbase = 2.7
theta = 0.0
pos_x = 0.0
pos_y = 0.0
prev_steer_time = 0.0
prev_speed_time = 0.0
steer_time = 0.0
speed_time = 0.0
speed_data = 0.0
steer_data = 0.0
for topic, msg, timestamp in read_messages(inpu):
    if(first_run):
        timestamp_start = timestamp
        first_run = False
    if (i % 1 == 0):
        if(topic == "/nissan/vehicle_steering"):
            steer_data = msg.data
            steer_time = (timestamp-timestamp_start) / 1000000000
        if(topic == "/nissan/vehicle_speed"):
            speed_data = msg.data
            speed_time = (timestamp-timestamp_start) / 1000000000
        if(topic == "/nissan/gps/duro/current_pose"):
            plt.plot(msg.pose.position.x, msg.pose.position.y, 'b.')
        steer_duration = prev_steer_time - steer_time
        speed_duration = prev_speed_time - speed_time
        # Ackermann Bicycle model
        pos_x += speed_duration * speed_data * np.cos(theta)
        pos_y += speed_duration * speed_data * np.sin(theta)
        theta += steer_duration * speed_data / wheelbase * np.tan(steer_data * -0.7)
        prev_steer_time = steer_time
        prev_speed_time = speed_time
        plt.plot(pos_x, pos_y, 'r.')
    i += 1

plt.show()
```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment