Last active
March 15, 2023 07:50
-
-
Save awesomebytes/48d44a88eb6b51431f3df9b1e3d13096 to your computer and use it in GitHub Desktop.
Python ROS 1 Subscriber that provides serialized messages and allows to deserialize at a later point
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from typing import Any | |
import rospy | |
import types | |
# Possible future idea: this could allow to also drop messages on purpose on the Python side | |
# deserializing is the second most expensive thing to do anyways (the first being just receiving the message) | |
# Author: Sam Pfeiffer <sammypfeiffer at gmail.com> | |
# License: BSD-3 | |
class SubscriberRaw(rospy.Subscriber): | |
""" | |
Subscriber but with the ability of accessing the raw serialized messages and delaying deserialization to just-before-use. | |
This allows to pass serialized buffers from/to Python-C++ (and potentially other languages). | |
The callback returns a message of the class as usual, however, it is not deserialized by default, the user needs to call .deserialize() | |
""" | |
def __init__(self, name, data_class, callback=None, callback_args=None, | |
queue_size=None, buff_size=rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE, tcp_nodelay=False): | |
# Store the data class so we can trigger deserialization when necessary | |
self._real_data_class = data_class | |
self._original_callback = callback | |
# The way to do this is to subscribe to rospy.AnyMsg and provide an overriden .deserialize() method on the callback message | |
def monkeypatch_deserialize(subclassed_msg_instance): | |
"""Deserialize into this same message""" | |
if subclassed_msg_instance.serialized_data and not subclassed_msg_instance._deserialized: | |
subclassed_msg_instance = self._real_data_class.deserialize( | |
subclassed_msg_instance, subclassed_msg_instance.serialized_data) | |
subclassed_msg_instance._deserialized = True | |
# Note: we could remove (del) the special attributes and make it a normal message instance if we wanted to | |
return subclassed_msg_instance | |
elif not subclassed_msg_instance.serialized_data: | |
RuntimeError( | |
"Tried to deserialize a message with no serialized data.") | |
# We could call deserialize multiple times but it will do nothing | |
def monkeypatch__getattribute__(subclassed_msg_instance, __name: str) -> Any: | |
# If the message is deserialized, nothing to do | |
if super(self._real_data_class, subclassed_msg_instance).__getattribute__('_deserialized'): | |
return super(self._real_data_class, subclassed_msg_instance).__getattribute__(__name) | |
# Raise if trying to access something in __slots__ that may not be initialized | |
if __name in self._real_data_class.__slots__ or __name == '__dict__' \ | |
and not super(self._real_data_class, subclassed_msg_instance).__getattribute__('_deserialized'): | |
raise RuntimeError( | |
"Attempting to access attribute '{}' of message type '{}' without deserializing. Call 'deserialize' first.".format(__name, self._real_data_class.__name__)) | |
# Otherwise, do the normal thing | |
else: | |
return super(self._real_data_class, subclassed_msg_instance).__getattribute__(__name) | |
def monkeypatch__repr_str__(subclassed_msg_instance): | |
if super(self._real_data_class, subclassed_msg_instance).__getattribute__('_deserialized'): | |
return super(self._real_data_class, subclassed_msg_instance).__repr__() | |
else: | |
return "Serialized message, buffer contains: {}".format(subclassed_msg_instance.serialized_data) | |
data_subclass = type( | |
self._real_data_class.__name__, (self._real_data_class,), {}) | |
# The subclass is necessary because otherwise we can't add attributes (the {} dict) | |
# For python object native methods (starting with __) we need to replace the method at the class level | |
data_subclass.__getattribute__ = monkeypatch__getattribute__ | |
data_subclass.__repr__ = monkeypatch__repr_str__ | |
data_subclass.__str__ = monkeypatch__repr_str__ | |
def __wrapped_callback(msg, callback_args=callback_args): | |
# We need to subclass to be able to add attributes and modify methods | |
# This can be done in other ways, but this way, any error will be printed as if it was a classic ROS Message of the type | |
# And a user can choose to deserialize as late as they want, | |
# they could even choose to deserialize partially by implementing their own _MessageName.py | |
subclassed_msg = data_subclass() | |
subclassed_msg.serialized_data = msg._buff | |
subclassed_msg._deserialized = False | |
# Monkeypatch methods of this instance | |
subclassed_msg.deserialize = types.MethodType( | |
monkeypatch_deserialize, subclassed_msg) | |
if callback_args is not None: | |
self._original_callback(subclassed_msg, callback_args) | |
else: | |
self._original_callback(subclassed_msg) | |
# Pass our wrapped callback with our subclassed message instead of the original message | |
# Also pass rospy.AnyMsg as the message type | |
super(SubscriberRaw, self).__init__(name, rospy.AnyMsg, __wrapped_callback, callback_args, | |
queue_size, buff_size, tcp_nodelay) | |
if __name__ == '__main__': | |
from std_msgs.msg import String | |
from random import choice | |
def my_cb(msg): | |
print("Subscriber callback, got msg: {}".format(msg)) | |
do_deserialize = choice([True, False]) | |
if do_deserialize: | |
print("Deserializing message") | |
msg.deserialize() | |
else: | |
print("Not deserializing message to show possible user error") | |
print("msg.data: {}".format(msg.data)) | |
rospy.init_node('test_node') | |
# Note: if in the same node, we subscribe with a normal message (not AnyMsg) to the same topic, we don't get the raw buffer anymore | |
# this is because rospy does not try to do anything smart about multiple subscriptions to the same topic (the Subscriber documentation | |
# warns about this behavior) | |
subs_raw = SubscriberRaw('test', String, my_cb, queue_size=1) | |
rospy.spin() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment