Skip to content

Instantly share code, notes, and snippets.

@peci1
Last active August 2, 2022 15:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peci1/5d99a6938f9203256837358f13d49dbf to your computer and use it in GitHub Desktop.
Save peci1/5d99a6938f9203256837358f13d49dbf to your computer and use it in GitHub Desktop.
Python script that extracts all images from a ROS BAG file into separate image files on disk
#!/usr/bin/env python
from __future__ import print_function
import argparse
import os
import rosbag
import sys
try:
from tqdm import tqdm
except ImportError:
class tqdm:
def __init__(self, total=100):
print("Install tqdm to view a nicer progressbar", file=sys.stderr)
self._total = total
self._i = 0
self._last_reported = 0
def update(self, n=1):
self._i += n
if self._i == 0 or (self._i - self._last_reported > self._total / 100.0):
print("%i %%" % (100.0 * self._i / self._total,))
self._last_reported = self._i
def __enter__(self):
self._i = self._last_reported = 0
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return self
dirs = set()
supported_types = ("sensor_msgs/CompressedImage",)
# TODO add Image along the lines of https://github.com/christian-rauch/rgbd_export/blob/master/rgbd_sync_export.py
def mkdir(dir):
if dir not in dirs and not os.path.exists(dir):
try:
os.makedirs(dir)
dirs.add(dir)
except:
pass
def main():
parser = argparse.ArgumentParser(description="Extract images from a ROS bag.")
parser.add_argument("--bag-file", help="Input ROS bag.")
parser.add_argument("--output-dir", help="Output directory.")
parser.add_argument("--prefix", default="", help="Image prefix.")
parser.add_argument("--topics", nargs="*", default=tuple(), help="Topics to export (all image topics if undefined)")
parser.add_argument("--subfolders", default=False, action="store_true",
help="Whether to create subfolders for individual cameras.")
parser.add_argument("--format", default="png", help="Format for saving raw images")
args = parser.parse_args()
print("Extract images from %s into %s" % (args.bag_file, args.output_dir))
mkdir(args.output_dir)
topics = None
if len(args.topics) > 0:
topics = args.topics
print("Images from the following topics will be saved: " + ", ".join(topics))
else:
print("Images from all topics will be saved")
bag = rosbag.Bag(args.bag_file, "r")
num_msgs = bag.get_message_count(topic_filters=topics)
with tqdm(total=num_msgs) as progress:
for topic, msg_raw, t in bag.read_messages(topics=topics, raw=True):
progress.update(1)
datatype, data, _, _, msg_type = msg_raw
if datatype not in supported_types:
continue
msg = msg_type()
msg.deserialize(data)
stamp = msg.header.stamp
camera = topic.replace("/", "_")
filename_template = "%s%s-%i.%09i.%s"
dir = args.output_dir
if args.subfolders:
if len(args.prefix) > 0:
dir = os.path.join(dir, args.prefix)
dir = os.path.join(dir, camera)
mkdir(dir)
if datatype == "sensor_msgs/CompressedImage":
data = msg.data
if "png" in msg.format:
ext = "png"
elif "jpeg" in msg.format or "jpg" in msg.format:
ext = "jpg"
else:
print("Unknown image format %s on topic %s" % (msg.format, topic), file=sys.stderr)
continue
filename = filename_template % (args.prefix, camera, stamp.secs, stamp.nsecs, ext)
filename = os.path.join(dir, filename)
with open(filename, 'w+') as f:
f.write(data)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment