Last active
October 21, 2020 02:46
-
-
Save huyaoyu/d3669dfeca0453dc599eb8ab64d729a0 to your computer and use it in GitHub Desktop.
Extract images for multiple topics from a ROS bagfile.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Yaoyu Hu <yaoyuh@andrew.cmu.edu> | |
# Referred to https://gist.github.com/wngreene/835cda68ddd9c5416defce876a4d7dd9 | |
# when I was writing this code. | |
import argparse | |
import cv2 | |
import os | |
import sys | |
# ROS specific packages. | |
import rosbag | |
from sensor_msgs.msg import Image | |
from cv_bridge import CvBridge | |
def extract_strings(inStr, delimiter=','): | |
ss = inStr.split(delimiter) | |
if ( 0 == len(ss) ): | |
raise Exception('{} contains no topics. '.format(inStr)) | |
return [ s.strip() for s in ss ] | |
def prepare_directories(dirs, prefix=''): | |
fullDirs = [] | |
for d in dirs: | |
out = os.path.join( prefix, d ) | |
if ( not os.path.isdir(out) ): | |
os.makedirs(out) | |
fullDirs.append(out) | |
return fullDirs | |
def load_bagfile(fn): | |
if ( not os.path.isfile(fn) ): | |
raise Exception('%s does not exist. ' % (fn)) | |
return rosbag.Bag( fn, 'r' ) | |
def extract_images(bag, topics, outDir, maxNum=0): | |
bridge = CvBridge() | |
nTopics = len( topics ) | |
counter = [ 0 for _ in range(nTopics) ] | |
totalNum = 0 | |
flagBreak = False | |
for topic, msg, t in bag.read_messages(topics=topics): | |
for i in range(nTopics): | |
if ( topic == topics[i] ): | |
cvImg = bridge.imgmsg_to_cv2(msg) | |
cvImg = cv2.cvtColor( cvImg, cv2.COLOR_RGB2BGR ) | |
outFn = os.path.join( outDir[i], '%06d.png' % (counter[i]) ) | |
cv2.imwrite(outFn, cvImg) | |
print('%d of %s written to %s. ' % ( counter[i]+1, topic, outFn )) | |
counter[i] += 1 | |
totalNum += 1 | |
if ( maxNum != 0 and totalNum >= maxNum ): | |
print('Maximum number reached. Abort! ') | |
flagBreak = True | |
break | |
if ( flagBreak ): | |
break | |
print('Summary: ') | |
for i in range(nTopics): | |
print('%s: %d images saved to %s. ' % ( topics[i], counter[i], outDir[i] )) | |
def handle_args(): | |
parser = argparse.ArgumentParser(description='Extract image from a bagfile. ') | |
parser.add_argument('infile', type=str, \ | |
help='The input bagfile. ') | |
parser.add_argument('outdir', type=str, \ | |
help='The root of output directory. ') | |
parser.add_argument('subdirs', type=str, \ | |
help='The sub-directoreis for each topic. ') | |
parser.add_argument('topics', type=str, \ | |
help='The topics, separated by commas. ') | |
parser.add_argument('--max-num', type=int, default=0, \ | |
help='The maximum number of images to process. Debug use. Set 0 to disable. ') | |
return parser.parse_args() | |
def main(): | |
print('Extract image frames from a bagfile. ') | |
args = handle_args() | |
# Get the topic list. | |
topics = extract_strings( args.topics ) | |
# Get the output sub-directories. | |
subDirs = extract_strings( args.subdirs ) | |
assert ( len( topics ) == len( subDirs ) ) | |
# Prepare the output directories. | |
dirs = prepare_directories( subDirs, args.outdir ) | |
print('Extract images from the following topics: ') | |
for t in topics: | |
print(t) | |
# Load the bag file. | |
bag = load_bagfile( args.infile ) | |
extract_images( bag, topics, dirs, args.max_num ) | |
# Clean up. | |
bag.close() | |
print('Done. ') | |
return 0 | |
if __name__ == '__main__': | |
sys.exit( main() ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment