Skip to content

Instantly share code, notes, and snippets.

@huyaoyu
Last active October 21, 2020 02:46
Show Gist options
  • Save huyaoyu/d3669dfeca0453dc599eb8ab64d729a0 to your computer and use it in GitHub Desktop.
Save huyaoyu/d3669dfeca0453dc599eb8ab64d729a0 to your computer and use it in GitHub Desktop.
Extract images for multiple topics from a ROS bagfile.
# Yaoyu Hu <yaoyuh@andrew.cmu.edu>
# Referred to https://gist.github.com/wngreene/835cda68ddd9c5416defce876a4d7dd9
# when I was writing this code.
import argparse
import cv2
import os
import sys
# ROS specific packages.
import rosbag
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
def extract_strings(inStr, delimiter=','):
ss = inStr.split(delimiter)
if ( 0 == len(ss) ):
raise Exception('{} contains no topics. '.format(inStr))
return [ s.strip() for s in ss ]
def prepare_directories(dirs, prefix=''):
fullDirs = []
for d in dirs:
out = os.path.join( prefix, d )
if ( not os.path.isdir(out) ):
os.makedirs(out)
fullDirs.append(out)
return fullDirs
def load_bagfile(fn):
if ( not os.path.isfile(fn) ):
raise Exception('%s does not exist. ' % (fn))
return rosbag.Bag( fn, 'r' )
def extract_images(bag, topics, outDir, maxNum=0):
bridge = CvBridge()
nTopics = len( topics )
counter = [ 0 for _ in range(nTopics) ]
totalNum = 0
flagBreak = False
for topic, msg, t in bag.read_messages(topics=topics):
for i in range(nTopics):
if ( topic == topics[i] ):
cvImg = bridge.imgmsg_to_cv2(msg)
cvImg = cv2.cvtColor( cvImg, cv2.COLOR_RGB2BGR )
outFn = os.path.join( outDir[i], '%06d.png' % (counter[i]) )
cv2.imwrite(outFn, cvImg)
print('%d of %s written to %s. ' % ( counter[i]+1, topic, outFn ))
counter[i] += 1
totalNum += 1
if ( maxNum != 0 and totalNum >= maxNum ):
print('Maximum number reached. Abort! ')
flagBreak = True
break
if ( flagBreak ):
break
print('Summary: ')
for i in range(nTopics):
print('%s: %d images saved to %s. ' % ( topics[i], counter[i], outDir[i] ))
def handle_args():
parser = argparse.ArgumentParser(description='Extract image from a bagfile. ')
parser.add_argument('infile', type=str, \
help='The input bagfile. ')
parser.add_argument('outdir', type=str, \
help='The root of output directory. ')
parser.add_argument('subdirs', type=str, \
help='The sub-directoreis for each topic. ')
parser.add_argument('topics', type=str, \
help='The topics, separated by commas. ')
parser.add_argument('--max-num', type=int, default=0, \
help='The maximum number of images to process. Debug use. Set 0 to disable. ')
return parser.parse_args()
def main():
print('Extract image frames from a bagfile. ')
args = handle_args()
# Get the topic list.
topics = extract_strings( args.topics )
# Get the output sub-directories.
subDirs = extract_strings( args.subdirs )
assert ( len( topics ) == len( subDirs ) )
# Prepare the output directories.
dirs = prepare_directories( subDirs, args.outdir )
print('Extract images from the following topics: ')
for t in topics:
print(t)
# Load the bag file.
bag = load_bagfile( args.infile )
extract_images( bag, topics, dirs, args.max_num )
# Clean up.
bag.close()
print('Done. ')
return 0
if __name__ == '__main__':
sys.exit( main() )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment