Last active
October 24, 2023 07:43
-
-
Save Sarath18/7c48b6f2e667bf6dab1b12f419cab397 to your computer and use it in GitHub Desktop.
Publish images (sensor_msgs/msg/Image) from webcam or video on ROS2 topic(s)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import rclpy | |
from rclpy.node import Node | |
import cv2 | |
import os | |
import sys | |
from sensor_msgs.msg import Image | |
from cv_bridge import CvBridge, CvBridgeError | |
class ImagePublisher(Node): | |
def __init__(self, file_path): | |
super().__init__("image_publisher") | |
self.bridge = CvBridge() | |
self.cap = cv2.VideoCapture(file_path) | |
self.pub = self.create_publisher(Image, "/video", 10) | |
def run(self): | |
while(self.cap.isOpened()): | |
ret, frame = self.cap.read() | |
if ret: | |
self.pub.publish(self.bridge.cv2_to_imgmsg(frame, "bgr8")) | |
else: | |
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
self.cap.release() | |
def main(args=None): | |
rclpy.init(args=args) | |
if(len(sys.argv) is not 2): | |
print("Incorrect number of arguments\nUsage:\n\tpython3 <path_to_video_file>") | |
exit() | |
if not os.path.isfile(sys.argv[1]): | |
print("Invalid file path") | |
exit() | |
ip = ImagePublisher(sys.argv[1]) | |
print("Publishing...") | |
ip.run() | |
ip.destroy_node() | |
rclpy.shutdown() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import rclpy | |
from rclpy.node import Node | |
import cv2 | |
from sensor_msgs.msg import Image | |
from cv_bridge import CvBridge, CvBridgeError | |
class ImagePublisher(Node): | |
def __init__(self): | |
super().__init__("image_publisher") | |
self.bridge = CvBridge() | |
self.cap = cv2.VideoCapture(0) | |
self.pub = self.create_publisher(Image, "/image", 10) | |
self.rgb8pub = self.create_publisher(Image, "/image/rgb", 10) | |
self.bgr8pub = self.create_publisher(Image, "/image/bgr", 10) | |
self.mono8pub = self.create_publisher(Image, "/image/mono", 10) | |
def run(self): | |
while True: | |
try: | |
r, frame = self.cap.read() | |
if not r: | |
return | |
self.pub.publish(self.bridge.cv2_to_imgmsg(frame, "bgr8")) | |
# BGR8 | |
self.bgr8pub.publish(self.bridge.cv2_to_imgmsg(frame, "bgr8")) | |
# RGB8 | |
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
self.rgb8pub.publish(self.bridge.cv2_to_imgmsg(frame_rgb, "rgb8")) | |
# MONO8 | |
frame_mono = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
self.mono8pub.publish(self.bridge.cv2_to_imgmsg(frame_mono, "mono8")) | |
except CvBridgeError as e: | |
print(e) | |
self.cap.release() | |
def main(args=None): | |
rclpy.init(args=args) | |
ip = ImagePublisher() | |
print("Publishing...") | |
ip.run() | |
ip.destroy_node() | |
rclpy.shutdown() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment