Last active
March 15, 2024 13:36
-
-
Save awesomebytes/85007248cd854b69f452fd58d9812343 to your computer and use it in GitHub Desktop.
message_filters ApproximateTimeSynchronizer over a rosbag (without roscore or /use_sim_time)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import itertools | |
import sys | |
import time | |
import rospy | |
import rosbag | |
import message_filters | |
""" | |
Example using a message_filters filter (ApproximateTimeSynchronizer) iterating | |
over a rosbag to synchronize some topics. | |
Scroll down to 'EDITABLE PART' if you want to adapt the script to a different | |
set of topics and parameters. | |
Author: Sammy Pfeiffer <sam.pfeiffer at conigital.com> | |
""" | |
# Monkey-patch Time.now so we don't need to call rospy.init_node (so we don't need a roscore) | |
# Also solves problems with /use_sim_time (simulated time) | |
class MonkeyPatchTime(rospy.Time): | |
def __init__(self, secs=0, nsecs=0): | |
super(rospy.Time, self).__init__(secs, nsecs) | |
@staticmethod | |
def now(): | |
# initialize with wallclock | |
float_secs = time.time() | |
secs = int(float_secs) | |
nsecs = int((float_secs - secs) * 1000000000) | |
return MonkeyPatchTime(secs, nsecs) | |
rospy.Time = MonkeyPatchTime | |
# Get an input rosbag | |
args = rospy.myargv(sys.argv) | |
if len(args) < 2: | |
raise RuntimeError('Missing input rosbag argument') | |
input_bag_name = args[1] | |
output_bag_name = input_bag_name + '_synchronized.bag' | |
print("Synchronizing {} into {}".format(input_bag_name, output_bag_name)) | |
# Create a rosbag with only synchronized messages | |
with rosbag.Bag(output_bag_name, 'w') as outbag: | |
# Setup an ApproximateTimeSynchronizer | |
# ------------------------------------------------------------------- | |
# EDITABLE PART: Substitute these with any kind of message or number of topics | |
from sensor_msgs.msg import NavSatFix | |
from sensor_msgs.msg import Imu | |
gnss_sub = message_filters.Subscriber('/gnss', NavSatFix) | |
imu_sub = message_filters.Subscriber('/imu/data', Imu) | |
# ApproximateTimeSynchronizer wants a list of subscribers | |
subscriber_list = [gnss_sub, imu_sub] | |
# Customize ApproximateTimeSynchronizer parameters | |
ats_queue_size = 1000 # Max messages in any queue | |
ats_slop = 0.1 # Max delay to allow between messages | |
rosbag_reader_skip_index = False # Makes opening the bag faster, but if the bag is unindexed it will fail | |
# ------------------------------------------------------------------- | |
# We want a dictionary view of the list for efficiency when dispatching messages | |
subscriber_dict = {} | |
for subscriber in subscriber_list: | |
subscriber_dict[subscriber.topic] = subscriber | |
# We want a list with the topic names in the same order to correctly dispatch messages | |
topic_names = [subscriber.topic for subscriber in subscriber_list] | |
ts = message_filters.ApproximateTimeSynchronizer(subscriber_list, | |
queue_size=ats_queue_size, | |
slop=ats_slop, | |
allow_headerless=False) | |
def callback(*msgs): | |
# The callback processing the pairs of numbers that arrived at approximately the same time | |
for topic_name, msg in itertools.izip(topic_names, msgs): | |
# Warning: we lose the original ROS time of publication | |
# This is because 1) We need to choose from which from any of the input messages | |
# 2) We need to create a mechanism to get those times to this callback | |
# The workaround is to use the message timestamp itself. | |
outbag.write(topic_name, msg, t=msg.header.stamp) | |
ts.registerCallback(callback) | |
print("Opening bag... (Only reading topics {})".format(topic_names)) | |
bag_reader = rosbag.Bag(input_bag_name, skip_index=True) | |
print("Synchronizing...") | |
ini_t = time.time() | |
for message_idx, (topic, msg, t) in enumerate(bag_reader.read_messages(topics=topic_names)): | |
# Send the message to the correct message_filters.Subscriber | |
subscriber = subscriber_dict.get(topic) | |
if subscriber: | |
# Show some output to show we are alive | |
if message_idx % 1000 == 0: | |
print("Message #{}, Topic: {}, message stamp: {}".format(message_idx, topic, msg.header.stamp)) | |
subscriber.signalMessage(msg) | |
fin_t = time.time() | |
total_t = fin_t - ini_t | |
print("Done. (Parsed {} messages in {}s)".format(message_idx, total_t)) | |
print("Output bag contains {} synchronized messages ( {} ).".format(outbag.get_message_count(), output_bag_name)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment