Skip to content

Instantly share code, notes, and snippets.

@zflat
Last active November 17, 2022 16:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zflat/ebe16ddd2941a7854bf5d05b2b50b891 to your computer and use it in GitHub Desktop.
Save zflat/ebe16ddd2941a7854bf5d05b2b50b891 to your computer and use it in GitHub Desktop.
Script to test sending ROS2 messages and record latency
#!/usr/bin/env python3
#
# This is a test script that can be used to test latency of sending ROS messages between containers
#
# First get a baseline by running everything in the same container
# Terminal 1:
# ```
# python3 talker_ros2_latency.py
# ```
# Terinal 2:
# ```
# python3 talker_ros2_latency.py echo
# ```
#
# Then run in different containers to compare with the baseline.
# Terminal 1, container A:
# ```
# python3 talker_ros2_latency.py
# ```
# Terinal 2, container B:
# ```
# python3 talker_ros2_latency.py echo
# ```
#
# To test with different message sizes, change the value assigned to self.data in the constructor.
import random
import rclpy
import sys
from rclpy.node import Node
from sensor_msgs.msg import Image
def diffStamps(later, earlier):
return (later.sec - earlier.sec) + (later.nanosec - earlier.nanosec) / 1e9
class TalkerNode(Node):
def __init__(self):
super().__init__('talker')
self.pub = self.create_publisher(Image, "from_talker", 1)
self.sub = self.create_subscription(Image, "from_echo", self.echoCB, 1)
self.timer = self.create_timer(0.01, self.send)
self.count = 0
self.times = {}
self.data = [random.randint(0,255) for _ in range(int(1e6))]
def echoCB(self, msg):
sent_at = self.times[int(msg.header.frame_id)]
del self.times[int(msg.header.frame_id)]
now = self.get_clock().now().to_msg()
print("echo: {} {} {} {} {} {}".format(
msg.header.stamp.sec,
str(msg.header.stamp.nanosec).zfill(9),
int(msg.header.frame_id),
diffStamps(msg.header.stamp, sent_at),
diffStamps(now, msg.header.stamp),
diffStamps(now, sent_at),
))
def send(self):
msg = Image()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = str(self.count)
msg.data = self.data
self.times[self.count] = msg.header.stamp
self.pub.publish(msg)
self.count = self.count + 1
class EchoNode(Node):
def __init__(self):
super().__init__('echo')
self.pub = self.create_publisher(Image, "from_echo", 1)
self.sub = self.create_subscription(Image, "from_talker", self.talkCB, 1)
def talkCB(self, msg):
print("talk: {} {} {}".format(
msg.header.stamp.sec,
str(msg.header.stamp.nanosec).zfill(9),
msg.header.frame_id))
reply = msg
reply.header.stamp = self.get_clock().now().to_msg()
reply.header.frame_id = msg.header.frame_id
self.pub.publish(reply)
def main(args=None):
if args is None:
args = sys.argv
rclpy.init(args=args)
if(len(args) > 1 and args[1] == 'echo'):
node = EchoNode()
else:
node = TalkerNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment