Skip to content

Instantly share code, notes, and snippets.

@awesomebytes
Last active March 15, 2023 07:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save awesomebytes/48d44a88eb6b51431f3df9b1e3d13096 to your computer and use it in GitHub Desktop.
Save awesomebytes/48d44a88eb6b51431f3df9b1e3d13096 to your computer and use it in GitHub Desktop.
Python ROS 1 Subscriber that provides serialized messages and allows to deserialize at a later point
#!/usr/bin/env python3
from typing import Any
import rospy
import types
# Possible future idea: this could allow to also drop messages on purpose on the Python side
# deserializing is the second most expensive thing to do anyways (the first being just receiving the message)
# Author: Sam Pfeiffer <sammypfeiffer at gmail.com>
# License: BSD-3
class SubscriberRaw(rospy.Subscriber):
"""
Subscriber but with the ability of accessing the raw serialized messages and delaying deserialization to just-before-use.
This allows to pass serialized buffers from/to Python-C++ (and potentially other languages).
The callback returns a message of the class as usual, however, it is not deserialized by default, the user needs to call .deserialize()
"""
def __init__(self, name, data_class, callback=None, callback_args=None,
queue_size=None, buff_size=rospy.impl.tcpros_base.DEFAULT_BUFF_SIZE, tcp_nodelay=False):
# Store the data class so we can trigger deserialization when necessary
self._real_data_class = data_class
self._original_callback = callback
# The way to do this is to subscribe to rospy.AnyMsg and provide an overriden .deserialize() method on the callback message
def monkeypatch_deserialize(subclassed_msg_instance):
"""Deserialize into this same message"""
if subclassed_msg_instance.serialized_data and not subclassed_msg_instance._deserialized:
subclassed_msg_instance = self._real_data_class.deserialize(
subclassed_msg_instance, subclassed_msg_instance.serialized_data)
subclassed_msg_instance._deserialized = True
# Note: we could remove (del) the special attributes and make it a normal message instance if we wanted to
return subclassed_msg_instance
elif not subclassed_msg_instance.serialized_data:
RuntimeError(
"Tried to deserialize a message with no serialized data.")
# We could call deserialize multiple times but it will do nothing
def monkeypatch__getattribute__(subclassed_msg_instance, __name: str) -> Any:
# If the message is deserialized, nothing to do
if super(self._real_data_class, subclassed_msg_instance).__getattribute__('_deserialized'):
return super(self._real_data_class, subclassed_msg_instance).__getattribute__(__name)
# Raise if trying to access something in __slots__ that may not be initialized
if __name in self._real_data_class.__slots__ or __name == '__dict__' \
and not super(self._real_data_class, subclassed_msg_instance).__getattribute__('_deserialized'):
raise RuntimeError(
"Attempting to access attribute '{}' of message type '{}' without deserializing. Call 'deserialize' first.".format(__name, self._real_data_class.__name__))
# Otherwise, do the normal thing
else:
return super(self._real_data_class, subclassed_msg_instance).__getattribute__(__name)
def monkeypatch__repr_str__(subclassed_msg_instance):
if super(self._real_data_class, subclassed_msg_instance).__getattribute__('_deserialized'):
return super(self._real_data_class, subclassed_msg_instance).__repr__()
else:
return "Serialized message, buffer contains: {}".format(subclassed_msg_instance.serialized_data)
data_subclass = type(
self._real_data_class.__name__, (self._real_data_class,), {})
# The subclass is necessary because otherwise we can't add attributes (the {} dict)
# For python object native methods (starting with __) we need to replace the method at the class level
data_subclass.__getattribute__ = monkeypatch__getattribute__
data_subclass.__repr__ = monkeypatch__repr_str__
data_subclass.__str__ = monkeypatch__repr_str__
def __wrapped_callback(msg, callback_args=callback_args):
# We need to subclass to be able to add attributes and modify methods
# This can be done in other ways, but this way, any error will be printed as if it was a classic ROS Message of the type
# And a user can choose to deserialize as late as they want,
# they could even choose to deserialize partially by implementing their own _MessageName.py
subclassed_msg = data_subclass()
subclassed_msg.serialized_data = msg._buff
subclassed_msg._deserialized = False
# Monkeypatch methods of this instance
subclassed_msg.deserialize = types.MethodType(
monkeypatch_deserialize, subclassed_msg)
if callback_args is not None:
self._original_callback(subclassed_msg, callback_args)
else:
self._original_callback(subclassed_msg)
# Pass our wrapped callback with our subclassed message instead of the original message
# Also pass rospy.AnyMsg as the message type
super(SubscriberRaw, self).__init__(name, rospy.AnyMsg, __wrapped_callback, callback_args,
queue_size, buff_size, tcp_nodelay)
if __name__ == '__main__':
from std_msgs.msg import String
from random import choice
def my_cb(msg):
print("Subscriber callback, got msg: {}".format(msg))
do_deserialize = choice([True, False])
if do_deserialize:
print("Deserializing message")
msg.deserialize()
else:
print("Not deserializing message to show possible user error")
print("msg.data: {}".format(msg.data))
rospy.init_node('test_node')
# Note: if in the same node, we subscribe with a normal message (not AnyMsg) to the same topic, we don't get the raw buffer anymore
# this is because rospy does not try to do anything smart about multiple subscriptions to the same topic (the Subscriber documentation
# warns about this behavior)
subs_raw = SubscriberRaw('test', String, my_cb, queue_size=1)
rospy.spin()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment