Skip to content

Instantly share code, notes, and snippets.

@jennyshane
Created January 21, 2020 17:15
Show Gist options
  • Save jennyshane/7b65b70aa2a7deda835f9199a9ce7d63 to your computer and use it in GitHub Desktop.
Save jennyshane/7b65b70aa2a7deda835f9199a9ce7d63 to your computer and use it in GitHub Desktop.
simple ros node to display camera stream using opencv
#!/usr/bin/env python
import threading
import rospy
import numpy as np
from sensor_msgs.msg import Image
from cv_bridge import CvBridge, CvBridgeError
import cv2
camera_image=np.zeros((480, 640, 3), np.uint8)
image_lock=threading.Lock()
quit_event=threading.Event()
update_event=threading.Event()
bridge=CvBridge()
def display_func(quit_ev, im_lock, cam_img, update_ev):
print(id(cam_img))
local_image=np.zeros((480, 640, 3), np.uint8)
cv2.namedWindow("image", flags=cv2.WINDOW_AUTOSIZE)
cv2.moveWindow("image", 0, 0)
while not quit_ev.is_set():
if update_ev.is_set():
im_lock.acquire()
print(id(cam_img))
np.copyto(local_image, cam_img)
im_lock.release()
update_ev.clear()
cv2.imshow("image", local_image)
cv2.waitKey(100)
def image_callback(msg, cam_im, im_lock, update_ev):
try:
cv_img=bridge.imgmsg_to_cv2(msg, "bgr8")
except CvBridgeError, e:
print(e)
else:
im_lock.acquire()
np.copyto(cam_im, cv_img)
print(id(cam_im))
im_lock.release()
update_ev.set()
def main():
print(id(camera_image))
display_thread=threading.Thread(target=display_func, args=(quit_event, image_lock, camera_image, update_event))
callback_partial=lambda x:image_callback(x, camera_image, image_lock, update_event)
rospy.init_node("image_listener")
image_topic="/cv_camera/image_raw"
rospy.Subscriber(image_topic, Image, callback_partial)
display_thread.start()
rospy.spin()
quit_event.set()
if __name__=="__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment