Skip to content

Instantly share code, notes, and snippets.

@weimzh
Last active April 25, 2022 03:22
Show Gist options
  • Save weimzh/4e97b35bcf70ba63cc650cbd54816207 to your computer and use it in GitHub Desktop.
Save weimzh/4e97b35bcf70ba63cc650cbd54816207 to your computer and use it in GitHub Desktop.
multiple rosbag reader in python
# Copyright (c) 2022, Wei Mingzhi.
# Released under BSD License.
# https://opensource.org/licenses/BSD-3-Clause
import rosbag
class _PriorityQueue(object):
def __init__(self):
self.queue = []
def is_empty(self):
return len(self.queue) == 0
def insert(self, data):
self.queue.append(data)
def delete(self):
max = 0
for i in range(len(self.queue)):
if self.queue[i] > self.queue[max]:
max = i
item = self.queue[max]
del self.queue[max]
return item
class _BagMsgEntry(object):
def __init__(self, bag_it, msg, topic, ts):
self.bag_it = bag_it
self.msg = msg
self.topic = topic
self.ts = ts
def __gt__(self, other):
return self.ts < other.ts
class MultiBagReader(object):
def __init__(self, bags, topics=None, start_time=None, end_time=None):
self._pq = _PriorityQueue()
for bag in bags:
bag_it = bag.read_messages(topics=topics, start_time=start_time, end_time=end_time)
self._put_next_msg(bag_it)
def _put_next_msg(self, bag_it):
try:
topic, msg, t = bag_it.__next__()
entry = _BagMsgEntry(bag_it, msg, topic, t)
self._pq.insert(entry)
except StopIteration:
pass
def read_messages(self):
while not self._pq.is_empty():
try:
m = self._pq.delete()
self._put_next_msg(m.bag_it)
yield (m.topic, m.msg, m.ts)
except IndexError:
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment