Skip to content

Instantly share code, notes, and snippets.

@nathanmargaglio
Forked from mfehr/mergebag.py
Last active August 13, 2021 16:03
Show Gist options
  • Save nathanmargaglio/8bd51718648804d4958f01a9e27198a4 to your computer and use it in GitHub Desktop.
Save nathanmargaglio/8bd51718648804d4958f01a9e27198a4 to your computer and use it in GitHub Desktop.
Merge ROS Bags
#!/usr/bin/env python
# From https://gist.github.com/mfehr/305c1d07f6ca6a6e70afe1f155843d17
import rosbag
import argparse
import os
import logging
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def parse_args():
parser = argparse.ArgumentParser(
prog = 'merge.py',
description='Merge ROS bags.'
)
parser.add_argument(
'-i', type=str,
help='prefix of the input files'
)
parser.add_argument(
'-o', type=str, default=None,
help='name of the output file'
)
parser.add_argument(
'--base', type=str, default=None,
help='merge in a base file'
)
parser.add_argument(
'--start_index', type=int, default=0,
help='the inclusive starting index for bag iteration'
)
parser.add_argument(
'--end_index', type=int, default=None,
help='the exclsuive ending index for bag iteration'
)
args = parser.parse_args()
return args
def merge_bags(out_path, in_prefix, base=None, start_index=0, end_index=None):
idx = start_index
with rosbag.Bag(out_path, 'w') as out_bag:
if base is not None:
logger.info('Merging base bag ' + str(base))
with rosbag.Bag(base) as in_bag:
for topic, msg, t in in_bag.read_messages():
out_bag.write(topic, msg, t)
while True:
if end_index is not None and idx >= end_index:
logger.info('End index reached.')
break
in_path = in_prefix + '_' + str(idx) + '.bag'
if not os.path.isfile(in_path):
break
logger.info('Processing ' + in_path)
with rosbag.Bag(in_path) as in_bag:
for topic, msg, t in in_bag.read_messages():
out_bag.write(topic, msg, t)
idx += 1
if __name__ == "__main__":
args = parse_args()
print(args)
in_prefix = args.i
if in_prefix is None:
raise ValueError('Input prefix (-i) argument required.')
if in_prefix[-1] == '_':
in_prefix = in_prefix[:-1]
out_path = args.o
if out_path is None:
out_path = in_prefix + '_merged'
if len(out_path.split('.')) < 2:
out_path += '.bag'
base = args.base
if len(base.split('.')) < 2:
base += '.bag'
start_index = args.start_index
end_index = args.end_index
merge_bags(
out_path=out_path,
in_prefix=in_prefix,
base=base,
start_index=start_index,
end_index=end_index
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment