Skip to content

Instantly share code, notes, and snippets.

@weimzh
Last active May 10, 2022 11:52
Show Gist options
  • Save weimzh/582978e2cc49594557c91b1cd90157ab to your computer and use it in GitHub Desktop.
Save weimzh/582978e2cc49594557c91b1cd90157ab to your computer and use it in GitHub Desktop.
Extracting definition of customized messages from ROS bag files
#!/usr/bin/env python
#
# Copyright (c) 2022, Wei Mingzhi <whistler_wmz@users.sf.net>.
#
# SPDX-License-Identifier: BSD-3-Clause
# https://spdx.org/licenses/BSD-3-Clause.html
#
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import rosbag
import sys
import os
import glob
import re
types = {}
for bagname in sys.argv[1:]:
bag = rosbag.Bag(bagname)
for topic, msg, t in bag.read_messages():
if not msg._type in types:
types[msg._type] = msg._full_text
packages = {}
for t in types:
p = t.split('/')[0]
try:
if p not in packages:
packages[p] = ['std_msgs']
os.mkdir(p)
os.mkdir(p + '/msg')
except:
pass
l = types[t].split('\n')
f = open(t.split('/')[0] + '/msg/' + t.split('/')[1] + '.msg', 'w')
for line in l:
if line.startswith('==='):
f.close()
elif line.startswith('MSG: '):
m = line.split(' ')[1]
p = m.split('/')[0]
if p not in packages:
packages[p] = ['std_msgs']
if p not in packages[t.split('/')[0]] and p != t.split('/')[0]:
packages[t.split('/')[0]].append(p)
try:
os.mkdir(p)
os.mkdir(p + '/msg')
except:
pass
f = open(m.split('/')[0] + '/msg/' + m.split('/')[1] + '.msg', 'w')
else:
f.write(line + "\n")
msg_name = re.split('[ \t]', line)[0]
if '/' in msg_name and not msg_name.startswith('#'):
pt = msg_name.split('/')[0]
if pt not in packages[p] and p != pt:
packages[p].append(pt)
f.close()
# write cmakelists.txt
for package in packages:
with open(package + '/CMakeLists.txt', 'w') as f:
f.write("cmake_minimum_required(VERSION 2.8.3)\n")
f.write("project(" + package + ")\n")
f.write("find_package(catkin REQUIRED COMPONENTS roscpp rospy message_generation \n")
for dep in packages[package]:
f.write(" " + dep + "\n")
f.write(")\n\n")
f.write("add_message_files(FILES \n")
msg_files = glob.glob(package + '/msg/*.msg')
for msgf in msg_files:
f.write(" " + msgf.split('/')[-1] + "\n")
f.write(")\n\n")
f.write("generate_messages(DEPENDENCIES \n")
for dep in packages[package]:
f.write(" " + dep + "\n")
f.write(")\n\n")
f.write("catkin_package(CATKIN_DEPENDS message_generation roscpp rospy \n")
for dep in packages[package]:
f.write(" " + dep + "\n")
f.write(")\n\n")
with open (package + '/package.xml', 'w') as f:
f.write('<?xml version="1.0"?>\n')
f.write('<package format="2">\n')
f.write(" <name>" + package + "</name>\n")
f.write(" <version>0.0.0</version>\n")
f.write(" <description>auto generated from rosbag</description>\n")
f.write(' <maintainer email="jane@doe.com">jane</maintainer>\n')
f.write(" <license>TODO</license>\n")
f.write(" <buildtool_depend>catkin</buildtool_depend>\n")
f.write(" <build_depend>roscpp</build_depend>\n")
f.write(" <build_depend>rospy</build_depend>\n")
f.write(" <build_depend>message_generation</build_depend>\n")
for dep in packages[package]:
f.write(" <build_depend>" + dep + "</build_depend>\n")
f.write(" <build_export_depend>roscpp</build_export_depend>\n")
f.write(" <build_export_depend>rospy</build_export_depend>\n")
f.write(" <build_export_depend>message_generation</build_export_depend>\n")
for dep in packages[package]:
f.write(" <build_export_depend>" + dep + "</build_export_depend>\n")
f.write(" <exec_depend>roscpp</exec_depend>\n")
f.write(" <exec_depend>rospy</exec_depend>\n")
for dep in packages[package]:
f.write(" <exec_depend>" + dep + "</exec_depend>\n")
f.write(" <export></export>\n")
f.write("</package>")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment