Skip to content

Instantly share code, notes, and snippets.

@cmpute
Created October 29, 2022 19:19
Show Gist options
  • Save cmpute/ada6d4fa6e2ecdd262e2a75b52e91509 to your computer and use it in GitHub Desktop.
Save cmpute/ada6d4fa6e2ecdd262e2a75b52e91509 to your computer and use it in GitHub Desktop.
Extract the images in a rosbag as a video
#!/usr/bin/env python3
import os
import rosbag
from cv2 import VideoWriter, VideoWriter_fourcc
from cv_bridge import CvBridge
import argparse
class RosbagVideoReader:
def __init__(self, args) -> None:
if not os.path.exists(args.input):
raise RuntimeError("the specified rosbag doesn't exist")
self.topic = args.topic
self.rate = args.rate
self.fps = args.fps
print("Loading the bag file...")
self.bag = rosbag.Bag(args.input)
outname = args.output
for (_, msg, t) in self.bag.read_messages(args.topic):
frame_size = (msg.width, msg.height)
if not outname:
outname = os.path.split(args.input)[-1] + "_" + args.topic + "_" + str(t) + ".mp4"
outname = outname.replace("/", "_")
if not outname.endswith(".mp4"):
outname += ".mp4"
print("Writing video to", outname)
self.bridge = CvBridge()
self.writer = VideoWriter(outname, VideoWriter_fourcc(*"x264"), args.fps, frame_size)
self._t_first = None
self._t_video = 0
self._t_prompt = 0
def process(self):
for (topic, msg, t) in self.bag.read_messages(self.topic):
assert topic == self.topic
if self._t_first is None:
self._t_first = t
image = self.bridge.imgmsg_to_cv2(msg)
t_bag = (t - self._t_first).to_sec() / self.rate
while self._t_video < t_bag:
self.writer.write(image)
self._t_video += 1 / self.fps
if self._t_video - self._t_prompt > 1:
self._t_prompt = self._t_video
print("Processed %.2fs" % self._t_prompt)
def __del__(self):
self.bag.close()
def define_args():
parser = argparse.ArgumentParser()
parser.add_argument("input", help="The input rosbag file")
parser.add_argument("topic", help="The image topic to be extracted")
parser.add_argument("-o", "--output", help="The name of the output video file", default="")
parser.add_argument("-r", "--rate", help="Playback rate of the output video", default=1.0)
parser.add_argument("-f", "--fps", help="FPS of the output video", default=10)
return parser.parse_args()
if __name__ == "__main__":
reader = RosbagVideoReader(define_args())
reader.process()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment