Skip to content

Instantly share code, notes, and snippets.

@tim-fan
Created February 11, 2019 09:07
Show Gist options
  • Save tim-fan/ab6c12afa89bb24da5f150ade3f65537 to your computer and use it in GitHub Desktop.
Save tim-fan/ab6c12afa89bb24da5f150ade3f65537 to your computer and use it in GitHub Desktop.
Ros node for running Rovio from output of realsense driver
#!/usr/bin/env python
#ROS node to adapt output of realsense camera driver (testing with D435i)
#to be used for input to Rovio
#
# Specifically, the node resolves two issues:
#
# 1) The realsense driver publishes images with encoding set as "8UC1"
# This leads to the following error in Rovio:
# "CvBridgeError: [8UC1] is not a color format. but [mono8] is"
# This node works around the issue by republishing incoming images
# with encoding set to "mono8"
#
# 2) Rovio expects imu timestamps to be strictly increasing, but at times
# the Realsense driver publishes consecutive imu messages with the
# same timestamp (expected to be related to the way the driver merges
# separate accelerometer and gyro streams).
# This node works around the issue by discarding IMU messages with the
# same timestamp as the previous message
#
# See:
# http://wiki.ros.org/RealSense
# https://github.com/ethz-asl/maplab
import rospy
from sensor_msgs.msg import Image, Imu
class ImuFilter:
def __init__(self):
self.lastImuTime = rospy.Time(0)
def filterImu(self, imuMsg):
if imuMsg.header.stamp == self.lastImuTime:
rospy.logwarn("Dropping IMU message with duplicate timestamp")
return None
else:
self.lastImuTime = imuMsg.header.stamp
return imuMsg
def updateImgEncoding(imgMsg):
imgMsg.encoding = "mono8"
return imgMsg
def adaptor():
rospy.init_node('realsense_rovio_adaptor')
imgPub = rospy.Publisher('image_mono8', Image, queue_size=10)
def imgCallback(msg):
imgPub.publish(updateImgEncoding(msg))
rospy.Subscriber("image", Image, imgCallback)
imuFilter = ImuFilter()
imuPub = rospy.Publisher('imu_filtered', Imu, queue_size=10)
def imuCallback(msg):
filteredMsg = imuFilter.filterImu(msg)
if filteredMsg is not None:
imuPub.publish(filteredMsg)
rospy.Subscriber("imu", Imu, imuCallback)
rospy.spin()
if __name__ == '__main__':
adaptor()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment