Last active
August 2, 2022 15:23
-
-
Save peci1/5d99a6938f9203256837358f13d49dbf to your computer and use it in GitHub Desktop.
Python script that extracts all images from a ROS BAG file into separate image files on disk
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import argparse | |
import os | |
import rosbag | |
import sys | |
try: | |
from tqdm import tqdm | |
except ImportError: | |
class tqdm: | |
def __init__(self, total=100): | |
print("Install tqdm to view a nicer progressbar", file=sys.stderr) | |
self._total = total | |
self._i = 0 | |
self._last_reported = 0 | |
def update(self, n=1): | |
self._i += n | |
if self._i == 0 or (self._i - self._last_reported > self._total / 100.0): | |
print("%i %%" % (100.0 * self._i / self._total,)) | |
self._last_reported = self._i | |
def __enter__(self): | |
self._i = self._last_reported = 0 | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
return self | |
dirs = set() | |
supported_types = ("sensor_msgs/CompressedImage",) | |
# TODO add Image along the lines of https://github.com/christian-rauch/rgbd_export/blob/master/rgbd_sync_export.py | |
def mkdir(dir): | |
if dir not in dirs and not os.path.exists(dir): | |
try: | |
os.makedirs(dir) | |
dirs.add(dir) | |
except: | |
pass | |
def main(): | |
parser = argparse.ArgumentParser(description="Extract images from a ROS bag.") | |
parser.add_argument("--bag-file", help="Input ROS bag.") | |
parser.add_argument("--output-dir", help="Output directory.") | |
parser.add_argument("--prefix", default="", help="Image prefix.") | |
parser.add_argument("--topics", nargs="*", default=tuple(), help="Topics to export (all image topics if undefined)") | |
parser.add_argument("--subfolders", default=False, action="store_true", | |
help="Whether to create subfolders for individual cameras.") | |
parser.add_argument("--format", default="png", help="Format for saving raw images") | |
args = parser.parse_args() | |
print("Extract images from %s into %s" % (args.bag_file, args.output_dir)) | |
mkdir(args.output_dir) | |
topics = None | |
if len(args.topics) > 0: | |
topics = args.topics | |
print("Images from the following topics will be saved: " + ", ".join(topics)) | |
else: | |
print("Images from all topics will be saved") | |
bag = rosbag.Bag(args.bag_file, "r") | |
num_msgs = bag.get_message_count(topic_filters=topics) | |
with tqdm(total=num_msgs) as progress: | |
for topic, msg_raw, t in bag.read_messages(topics=topics, raw=True): | |
progress.update(1) | |
datatype, data, _, _, msg_type = msg_raw | |
if datatype not in supported_types: | |
continue | |
msg = msg_type() | |
msg.deserialize(data) | |
stamp = msg.header.stamp | |
camera = topic.replace("/", "_") | |
filename_template = "%s%s-%i.%09i.%s" | |
dir = args.output_dir | |
if args.subfolders: | |
if len(args.prefix) > 0: | |
dir = os.path.join(dir, args.prefix) | |
dir = os.path.join(dir, camera) | |
mkdir(dir) | |
if datatype == "sensor_msgs/CompressedImage": | |
data = msg.data | |
if "png" in msg.format: | |
ext = "png" | |
elif "jpeg" in msg.format or "jpg" in msg.format: | |
ext = "jpg" | |
else: | |
print("Unknown image format %s on topic %s" % (msg.format, topic), file=sys.stderr) | |
continue | |
filename = filename_template % (args.prefix, camera, stamp.secs, stamp.nsecs, ext) | |
filename = os.path.join(dir, filename) | |
with open(filename, 'w+') as f: | |
f.write(data) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment