Skip to content

Instantly share code, notes, and snippets.

@belem2050
Created November 11, 2023 16:40
Show Gist options
  • Save belem2050/2f90b694c69158c3d5cc5dbadbfd88f9 to your computer and use it in GitHub Desktop.
Save belem2050/2f90b694c69158c3d5cc5dbadbfd88f9 to your computer and use it in GitHub Desktop.
Class to send and receive ROS2 messages to a ROS master that has a rosbridge websocket.
#!/usr/bin/env python
import json
from uuid import uuid4
# Note that this needs:
# sudo pip install websocket-client
# not the library called 'websocket'
import websocket
import rclpy
from rosidl_runtime_py.convert import message_to_ordereddict
from rosidl_runtime_py.set_message import set_message_fields
from rclpy.clock import ROSClock
from geometry_msgs.msg import PoseStamped
"""
Class to send and receive ROS2 messages to a ROS master that has a rosbridge websocket.
Author: Dmitry Devitt (https://github.com/GigaFlopsis/rospy_websocker_client/blob/master/src/rospy_websocker_client/WebsocketROSClient.py)
Rework: Sammy Pfeiffer (https://gist.github.com/awesomebytes/67ef305a2d9072ac64b786af292f5907)
Rework : Moumouni BELEM ( ROS2 version )
"""
class WebsocketROSClient(object):
def __init__(self, websocket_ip, port=9090):
"""
Class to manage publishing to ROS thru a rosbridge websocket.
:param str websocket_ip: IP of the machine with the rosbridge server.
:param int port: Port of the websocket server, defaults to 9090.
"""
self._advertise_dict = {}
print("Connecting to websocket: {}:{}".format(websocket_ip, port))
try:
self.ws = websocket.create_connection(
'ws://' + websocket_ip + ':' + str(port))
except:
ConnectionRefusedError(f"Cannot connect to {websocket_ip}:{port}")
def _advertise(self, topic_name, topic_type):
"""
Advertise a topic with it's type in 'package/Message' format.
:param str topic_name: ROS topic name.
:param str topic_type: ROS topic type, e.g. std_msgs/String.
:returns str: ID to de-advertise later on.
"""
new_uuid = str(uuid4())
self._advertise_dict[new_uuid] = {'topic_name': topic_name,
'topic_type': topic_type}
advertise_msg = {"op": "advertise",
"id": new_uuid,
"topic": topic_name,
"type": topic_type
}
self.ws.send(json.dumps(advertise_msg))
return new_uuid
def _unadvertise(self, uuid):
unad_msg = {"op": "unadvertise",
"id": uuid,
# "topic": topic_name
}
self.ws.send(json.dumps(unad_msg))
def __del__(self):
"""Cleanup all advertisings"""
d = self._advertise_dict
for k in d:
self._unadvertise(k)
def _publish(self, topic_name, message):
"""
Publish onto the already advertised topic the msg in the shape of
a Python dict.
:param str topic_name: ROS topic name.
:param dict msg: Dictionary containing the definition of the message.
"""
msg = {
'op': 'publish',
'topic': topic_name,
'msg': message
}
json_msg = json.dumps(msg)
self.ws.send(json_msg)
def publish(self, topic_name, ros_message):
"""
Publish on a topic given ROS message thru rosbridge.
:param str topic_name: ROS topic name.
:param * ros_message: Any ROS message instance, e.g. LaserScan()
from sensor_msgs/LaserScan.
"""
# First check if we already advertised the topic
d = self._advertise_dict
for k in d:
if d[k]['topic_name'] == topic_name:
# Already advertised, do nothing
break
else:
# Not advertised, so we advertise
topic_type = self.__ros_msg_to_string_msg_type(ros_message)
self._advertise(topic_name, topic_type)
# Converting ROS message to a dictionary thru YAML
ros_message_as_dict = message_to_ordereddict(ros_message)
# Publishing
self._publish(topic_name, ros_message_as_dict)
def __ros_msg_to_string_msg_type(self, ros_message):
module = type(ros_message).__module__
point_by_slash = module.replace(".", "/")
str_slashed_msg = "/".join([point_by_slash, type(ros_message).__name__]) #std_msgs/msg/_string/String
str_msg_type = "/".join([i for i in str_slashed_msg.rsplit("/") if i != "msg" and not i.startswith("_")]) #std_msgs/String
return str_msg_type
def subscribe(self, topic_name, ros_message):
# First check if we already advertised the topic
d = self._advertise_dict
topic_type = self.__ros_msg_to_string_msg_type(ros_message)
for k in d:
if d[k]['topic_name'] == topic_name:
# Already advertised, do nothing
break
else:
# Not advertised, so we advertise
self._advertise(topic_name, topic_type)
# Publishing
return self._subscribe(topic_name, ros_message, topic_type)
def _subscribe(self, topic_name, message, type):
"""
Publish onto the already advertised topic the msg in the shape of
a Python dict.
:param str topic_name: ROS topic name.
:param dict msg: Dictionary containing the definition of the message.
"""
msg = {
'op': 'subscribe',
'topic': topic_name,
'type' : type
}
json_msg = json.dumps(msg)
self.ws.send(json_msg)
json_message = self.ws.recv()
dictionary = json.loads(json_message)['msg']
set_message_fields(message, dictionary)
# print("Type: '%s' \n Received: '%s'" % (type, message))
return message
if __name__ == '__main__':
rclpy.init()
connect = WebsocketROSClient('172.17.0.1')
try:
while rclpy.ok():
# Publish to server
pose = PoseStamped()
pose.header.stamp = ROSClock().now().to_msg()
print("Publishing to server")
connect.publish('/pose_fake_topic', pose)
# subscribe
result = connect.subscribe('/pose_fake_topic', PoseStamped())
print("result:", result)
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment