Last active
January 22, 2023 05:04
-
-
Save kosuke55/3d23165bbc88893eb749074db264911b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import csv | |
import os | |
from datetime import datetime | |
import sqlite3 | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from pprint import pprint | |
from rclpy.serialization import deserialize_message | |
from rosidl_runtime_py.utilities import get_message | |
""" | |
reference: https://answers.ros.org/question/358686/how-to-read-a-bag-file-in-ros2/ | |
""" | |
class BagFileParser(): | |
def __init__(self, bag_file): | |
self.conn = sqlite3.connect(bag_file) | |
self.cursor = self.conn.cursor() | |
# create a message type map | |
topics_data = self.cursor.execute( | |
"SELECT id, name, type FROM topics").fetchall() | |
self.topic_type \ | |
= {name_of: type_of for id_of, name_of, type_of in topics_data} | |
self.topic_id \ | |
= {name_of: id_of for id_of, name_of, type_of in topics_data} | |
self.topic_msg_message = { | |
name_of: get_message(type_of) for id_of, | |
name_of, | |
type_of in topics_data} | |
self.topic_names = [ | |
'/sensing/lidar/left/velodyne_packets', | |
'/sensing/lidar/top/velodyne_packets', | |
'/sensing/lidar/right/velodyne_packets', | |
'/perception/object_recognition/detection/objects', | |
'/perception/object_recognition/tracking/objects', | |
'/perception/object_recognition/objects', | |
] | |
self.topics = [self.get_messages(t) for t in self.topic_names] | |
self.stamp_and_rate = {} | |
for n in self.topic_names: | |
self.stamp_and_rate[n] = [] | |
self.window_size = 10 | |
def __del__(self): | |
self.conn.close() | |
# Return [(timestamp0, message0), (timestamp1, message1), ...] | |
def get_messages(self, topic_name): | |
topic_id = self.topic_id[topic_name] | |
# Get from the db | |
rows = self.cursor.execute( | |
"SELECT timestamp, data FROM messages WHERE topic_id = {}".format(topic_id)).fetchall() | |
# Deserialise all and timestamp them | |
return [(timestamp, deserialize_message( | |
data, self.topic_msg_message[topic_name])) for timestamp, data in rows] | |
def get_topic_rate(self, stamp_list): | |
if len(stamp_list) < 2: | |
return 0.0 | |
window_size = min(len(stamp_list), self.window_size) | |
start_time = stamp_list[-window_size] | |
end_time = stamp_list[-1] | |
time_diff = end_time - start_time | |
return (window_size - 1) / time_diff | |
def run(self): | |
for topic, name in zip(self.topics, self.topic_names): | |
stamp_list = [] | |
for msg in topic: | |
stamp = msg[0] / 1000000000 | |
# stamp = msg[1].header.stamp.sec + msg[1].header.stamp.nanosec / 1000000000. | |
stamp_list.append(stamp) | |
if len(stamp_list) > self.window_size: | |
topic_rate = self.get_topic_rate(stamp_list) | |
self.stamp_and_rate[name].append([stamp, topic_rate]) | |
fig = plt.figure() | |
ax = fig.add_subplot(1, 1, 1) | |
ax.set_xlabel("stamp") | |
ax.set_ylabel("hz") | |
for name in self.topic_names: | |
arr = np.array(self.stamp_and_rate[name]) | |
stamp = arr[:, 0] | |
rate = arr[:, 1] | |
ax.plot(stamp, rate, label=name) | |
ax.legend() | |
plt.show() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description='report hz from rosbag.') | |
parser.add_argument('bag_file', help='input bagfile') | |
args = parser.parse_args() | |
parser = BagFileParser(args.bag_file) | |
parser.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment