Skip to content

Instantly share code, notes, and snippets.

@IamPhytan
Created January 23, 2023 16:36
Show Gist options
  • Save IamPhytan/b2e411c2d064d01f3a5763017a48d53d to your computer and use it in GitHub Desktop.
Save IamPhytan/b2e411c2d064d01f3a5763017a48d53d to your computer and use it in GitHub Desktop.
MCAP to CSV
#!/usr/bin/env python3
import argparse
from collections import OrderedDict
from pathlib import Path
from typing import Dict, Tuple, Union
import pandas as pd
from mcap.reader import McapReader, make_reader
from mcap.records import Channel, Statistics
from mcap.summary import Summary
from mcap_ros1.reader import read_ros1_messages
from mcap_ros2.reader import read_ros2_messages
from tf_transformations import euler_from_quaternion
from tqdm import tqdm
class MCAPPath(type(Path())):
csv_type: str | None = None
def parse_arguments():
"""Parse mcapfile name"""
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--mcapfile", help="""MCAP file path""", required=True)
args = parser.parse_args()
mcapfile = args.mcapfile
mcap_filepath = MCAPPath(mcapfile)
return mcap_filepath
def get_ros_profile(reader: McapReader):
"""Get ROS Profile from a McapReader
Args:
filename (Path | str): _description_
Returns:
str: ROS profile of MCAP messages (ros1 / ros2)
"""
header = reader.get_header()
return header.profile
def slugify(topic: str):
"""Convert topic into a filename-compatible string
Slugify
Args:
topic: ROS Topic
Returns:
str: Slugified ROS Topic
"""
topic = topic[1:] if topic.startswith("/") else topic
return topic.replace("/", "_")
def get_topics(csv_type: str):
"""Get list of topics according to the csv type
Args:
csv_type (str): Type of CSV to export
Returns:
list: List of topics to export for the CSV type
"""
ToI = {
"elec": [
"/mcu/status",
"/left_drive/status/battery_voltage",
"/left_drive/status/battery_current",
"/right_drive/status/battery_voltage",
"/right_drive/status/battery_current",
],
"imu": [
"/imu/data",
"/imu_and_wheel_odom",
"/MTI_imu/data",
],
"gps": [
"/gps/fix_front",
"/gps/fix_back",
"/gps/vel_front",
"/gps/vel_back",
"/gps/time_reference_front",
"/gps/time_reference_back",
],
"odom": [
"/cmd_vel",
"/hri_cmd_vel",
"/rc_teleop/cmd_vel",
"/left_drive/status/speed",
"/left_drive/velocity",
"/right_drive/status/speed",
"/right_drive/velocity",
"/velocity",
"/warthog_velocity_controller/odom",
"/warthog_velocity_controller/cmd_vel",
"/icp_odom",
],
"temp": [
"/left_drive/status/heatsink_temperature",
"/left_drive/status/motor_temperature",
"/right_drive/status/heatsink_temperature",
"/right_drive/status/motor_temperature",
],
"doughnut": [
"/calib_switch",
],
}
return ToI[csv_type]
def decompose_msg(topic: str, msg):
"""Get important elements from a rosbag msg
Args:
topic (str): Topic of the msg
msg (rosbag.msg): Structure that contains information at a certain time in the rosbag
Raises:
NotImplementedError: Topic is not implemented
Returns:
dict: Dictionary of important values from the msg
"""
if topic == "/mcu/status":
return {
"measured_battery": msg.measured_battery,
"measured_48v": msg.measured_48v,
"measured_24v": msg.measured_24v,
"measured_12v": msg.measured_12v,
"current_battery": msg.current_battery,
"current_48v": msg.current_48v,
"current_24v": msg.current_24v,
"current_12v": msg.current_12v,
"current_computer": msg.current_computer,
}
elif topic in ["/imu/data", "/MTI_imu/data"]:
timestamp = msg.header.stamp.to_sec()
orient = msg.orientation
ang_vel = msg.angular_velocity
lin_acc = msg.linear_acceleration
euler = euler_from_quaternion([orient.x, orient.y, orient.z, orient.w])
roll, pitch, yaw = euler
return {
"time": timestamp,
"orientation": {
"x": orient.x,
"y": orient.y,
"z": orient.z,
"w": orient.w,
"roll": roll,
"pitch": pitch,
"yaw": yaw,
},
"ang_vel": {"x": ang_vel.x, "y": ang_vel.y, "z": ang_vel.z},
"lin_acc": {"x": lin_acc.x, "y": lin_acc.y, "z": lin_acc.z},
}
elif topic in ["/imu_and_wheel_odom", "/icp_odom"]:
timestamp = msg.header.stamp.to_sec()
pose = msg.pose.pose
posit = pose.position
orient = pose.orientation
euler = euler_from_quaternion([orient.x, orient.y, orient.z, orient.w])
roll, pitch, yaw = euler
twist = msg.twist.twist
lin_twi = twist.linear
ang_twi = twist.angular
return {
"time": timestamp,
"position": {"x": posit.x, "y": posit.y, "z": posit.z},
"orientation": {
"x": orient.x,
"y": orient.y,
"z": orient.z,
"w": orient.w,
"roll": roll,
"pitch": pitch,
"yaw": yaw,
},
"lin_twi": {"x": lin_twi.x, "y": lin_twi.y, "z": lin_twi.z},
"ang_twi": {"x": ang_twi.x, "y": ang_twi.y, "z": ang_twi.z},
}
elif topic == "/warthog_velocity_controller/odom":
timestamp = msg.header.stamp.to_sec()
pose = msg.pose.pose
posit = pose.position
orient = pose.orientation
euler = euler_from_quaternion([orient.x, orient.y, orient.z, orient.w])
roll, pitch, yaw = euler
twist = msg.twist.twist
lin_twi = twist.linear
ang_twi = twist.angular
return {
"time": timestamp,
"position": {"x": posit.x, "y": posit.y, "z": posit.z},
"orientation": {
"x": orient.x,
"y": orient.y,
"z": orient.z,
"w": orient.w,
"roll": roll,
"pitch": pitch,
"yaw": yaw,
},
"lin_twi": {"x": lin_twi.x, "y": lin_twi.y, "z": lin_twi.z},
"ang_twi": {"x": ang_twi.x, "y": ang_twi.y, "z": ang_twi.z},
}
elif "/gps/fix_" in topic:
# GPS Fix position
timestamp = msg.header.stamp.to_sec()
return {
"time": timestamp,
"latitude": msg.latitude,
"longitude": msg.longitude,
"altitude": msg.altitude,
}
elif "/gps/vel_" in topic:
# GPS Fix position
timestamp = msg.header.stamp.to_sec()
lin_twi = msg.twist.linear
ang_twi = msg.twist.angular
return {
"time": timestamp,
"lin_twi": {"x": lin_twi.x, "y": lin_twi.y, "z": lin_twi.z},
"ang_twi": {"x": ang_twi.x, "y": ang_twi.y, "z": ang_twi.z},
}
elif "/gps/time_reference_" in topic:
# GPS Fix position
timestamp = msg.header.stamp.to_sec()
time_ref = msg.time_ref.to_sec()
return {"time": timestamp, "time_ref": time_ref}
elif topic == "/velocity":
# Twist Stamped messaged
timestamp = msg.header.stamp.to_sec()
lin_twi = msg.twist.linear
ang_twi = msg.twist.angular
return {
"time": timestamp,
"linear": {"x": lin_twi.x, "y": lin_twi.y, "z": lin_twi.z},
"angular": {"x": ang_twi.x, "y": ang_twi.y, "z": ang_twi.z},
}
elif "cmd_vel" in topic:
# command velocity
lin_twi = msg.linear
ang_twi = msg.angular
return {
"linear": {"x": lin_twi.x, "y": lin_twi.y, "z": lin_twi.z},
"angular": {"x": ang_twi.x, "y": ang_twi.y, "z": ang_twi.z},
}
elif "_drive/status/speed" in topic or "_drive/velocity" in topic:
# Drive velocity data
return msg.data
elif "_drive/status/battery" in topic:
# Drive battery electrical data
return msg.data
elif "_temperature" in topic:
# Drive temperature data
return msg.data
elif topic == "/calib_switch":
# Doughnut calibration switch
return msg.data
else:
raise NotImplementedError(f"Not yet implemented : {topic}")
ChannelOrStatistics = Union[Channel, Statistics]
def get_topic_metadata(summary: Summary, topic: str) -> Dict[str, ChannelOrStatistics]:
"""Get metadata for a specific topic
Args:
summary: _description_
topic: Topic name
"""
stats = summary.statistics
channs = summary.channels
top_ch = tuple(ch for ch in channs.values() if ch.topic == topic)
if len(top_ch) == 0:
raise ValueError(f"Topic '{topic}' is not available in the mcap file")
top_ch = top_ch[0]
return {"channel": top_ch, "stats": stats}
def get_topic_msgs(topic_: str, reader: McapReader):
"""Get rosbag messages from a certain topic
Args:
topic_: Topic of interest
reader: Bag object created with the `rosbag` library
Returns:
List: List of messages from the topic. Each message is given in the form of a dictionary
"""
time_dat = OrderedDict.fromkeys(["time", topic_], 0)
data_list = []
# Get the right reader from the header profile
profile = reader.get_header().profile
read_funcs = {"ros1": read_ros1_messages, "ros2": read_ros2_messages}
func = read_funcs.get(profile.lower(), None)
if func is None:
raise NotImplementedError(f"Profile {profile} not implemented")
topic_meta = get_topic_metadata(reader.get_summary(), topic_)
top_ch = topic_meta["channel"]
top_stats = topic_meta["stats"]
msg_count = top_stats.channel_message_counts[top_ch.id]
with tqdm(total=msg_count) as pbar:
for msg in func(source=reader, topics=[topic_]):
t = msg.publish_time_ns / 1e9
time_dat["time"] = t
time_dat[topic_] = decompose_msg(topic_, msg.ros_msg)
# # Stretch out time_dat
normalized = pd.json_normalize(
time_dat,
sep="/",
record_prefix="/",
)
normalized = normalized.to_dict(orient="records")[0]
# New csv row
data_list.append(normalized)
# Progress bar
pbar.update(1)
return data_list
def export_to_csv(topic, msg_arr, mcap_path: Path):
df = pd.DataFrame(msg_arr)
if len(df) > 1:
df = df[["time"] + [c for c in df if c != "time"]]
# Filter CSV
df = df.fillna(method="pad")
df = df.dropna()
def is_parent(df: pd.DataFrame, name: str):
"""Determine if name in df columns is a parent of other columns"""
columns = ",".join(df.columns.tolist())
has_children = (name + "/") in columns
is_zero = ~(df[name].astype(bool)).all()
return has_children and is_zero
parent_cols = [c for c in df.columns.tolist() if is_parent(df, c)]
df = df.drop(columns=parent_cols)
# Create a folder for CSV
type_path = mcap_path.parent.resolve() / mcap_path.csv_type
if not type_path.exists():
type_path.mkdir(parents=True, exist_ok=True)
# Export df to CSV
out_filename = f"{mcap_path.stem}_{slugify(topic)}.csv"
csv_path = type_path / out_filename
print("Saving to", csv_path, "with columns :\n", *df.columns.values)
df.to_csv(csv_path, index=False)
def main():
mcapfp = parse_arguments()
mcapfp.__slots__ = (mcapfp.__slots__) + ("csv_type",)
mcapfp.csv_type = mcapfp.stem.rsplit("_", 1)[-1]
print(mcapfp.csv_type, mcapfp)
topics = get_topics(mcapfp.csv_type)
with open(mcapfp, "rb") as f:
reader = make_reader(f)
for topic in topics:
try:
messages = get_topic_msgs(topic, reader)
export_to_csv(topic, messages, mcapfp)
except ValueError as ve:
print(ve)
finally:
continue
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment