Skip to content

Instantly share code, notes, and snippets.

@kosuke55
Last active January 22, 2023 05:04
Show Gist options
  • Save kosuke55/3d23165bbc88893eb749074db264911b to your computer and use it in GitHub Desktop.
Save kosuke55/3d23165bbc88893eb749074db264911b to your computer and use it in GitHub Desktop.
import argparse
import csv
import os
from datetime import datetime
import sqlite3
import numpy as np
import matplotlib.pyplot as plt
from pprint import pprint
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
"""
reference: https://answers.ros.org/question/358686/how-to-read-a-bag-file-in-ros2/
"""
class BagFileParser():
def __init__(self, bag_file):
self.conn = sqlite3.connect(bag_file)
self.cursor = self.conn.cursor()
# create a message type map
topics_data = self.cursor.execute(
"SELECT id, name, type FROM topics").fetchall()
self.topic_type \
= {name_of: type_of for id_of, name_of, type_of in topics_data}
self.topic_id \
= {name_of: id_of for id_of, name_of, type_of in topics_data}
self.topic_msg_message = {
name_of: get_message(type_of) for id_of,
name_of,
type_of in topics_data}
self.topic_names = [
'/sensing/lidar/left/velodyne_packets',
'/sensing/lidar/top/velodyne_packets',
'/sensing/lidar/right/velodyne_packets',
'/perception/object_recognition/detection/objects',
'/perception/object_recognition/tracking/objects',
'/perception/object_recognition/objects',
]
self.topics = [self.get_messages(t) for t in self.topic_names]
self.stamp_and_rate = {}
for n in self.topic_names:
self.stamp_and_rate[n] = []
self.window_size = 10
def __del__(self):
self.conn.close()
# Return [(timestamp0, message0), (timestamp1, message1), ...]
def get_messages(self, topic_name):
topic_id = self.topic_id[topic_name]
# Get from the db
rows = self.cursor.execute(
"SELECT timestamp, data FROM messages WHERE topic_id = {}".format(topic_id)).fetchall()
# Deserialise all and timestamp them
return [(timestamp, deserialize_message(
data, self.topic_msg_message[topic_name])) for timestamp, data in rows]
def get_topic_rate(self, stamp_list):
if len(stamp_list) < 2:
return 0.0
window_size = min(len(stamp_list), self.window_size)
start_time = stamp_list[-window_size]
end_time = stamp_list[-1]
time_diff = end_time - start_time
return (window_size - 1) / time_diff
def run(self):
for topic, name in zip(self.topics, self.topic_names):
stamp_list = []
for msg in topic:
stamp = msg[0] / 1000000000
# stamp = msg[1].header.stamp.sec + msg[1].header.stamp.nanosec / 1000000000.
stamp_list.append(stamp)
if len(stamp_list) > self.window_size:
topic_rate = self.get_topic_rate(stamp_list)
self.stamp_and_rate[name].append([stamp, topic_rate])
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.set_xlabel("stamp")
ax.set_ylabel("hz")
for name in self.topic_names:
arr = np.array(self.stamp_and_rate[name])
stamp = arr[:, 0]
rate = arr[:, 1]
ax.plot(stamp, rate, label=name)
ax.legend()
plt.show()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='report hz from rosbag.')
parser.add_argument('bag_file', help='input bagfile')
args = parser.parse_args()
parser = BagFileParser(args.bag_file)
parser.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment