Skip to content

Instantly share code, notes, and snippets.

@wildlarva
Created January 27, 2024 11:53
Show Gist options
  • Save wildlarva/8dae3b01d3be50f9c3746ed7245dbdfa to your computer and use it in GitHub Desktop.
Save wildlarva/8dae3b01d3be50f9c3746ed7245dbdfa to your computer and use it in GitHub Desktop.
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from typing import Callable
from typing import Optional
from typing import TypeVar
from typing import Union
from rclpy.callback_groups import CallbackGroup
from rclpy.exceptions import InvalidTopicNameException
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.qos import QoSProfile
from rclpy.qos_event import SubscriptionEventCallbacks
from rclpy.qos_overriding_options import _declare_qos_parameters
from rclpy.qos_overriding_options import QoSOverridingOptions
from rclpy.subscription import Subscription
from rclpy.type_support import check_is_valid_msg_type
MsgType = TypeVar('MsgType')
class NodeEx(Node):
def create_subscription_ex(
self,
msg_type,
topic: str,
callback: Callable[[MsgType], None] | None,
qos_profile: Union[QoSProfile, int],
*,
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
qos_overriding_options: Optional[QoSOverridingOptions] = None,
raw: bool = False
) -> Subscription:
"""
Create a new subscription.
:param msg_type: The type of ROS messages the subscription will subscribe to.
:param topic: The name of the topic the subscription will subscribe to.
:param callback: A user-defined callback function that is called when a message is
received by the subscription.
:param qos_profile: A QoSProfile or a history depth to apply to the subscription.
In the case that a history depth is provided, the QoS history is set to
KEEP_LAST, the QoS history depth is set to the value
of the parameter, and all other QoS settings are set to their default values.
:param callback_group: The callback group for the subscription. If ``None``, then the
default callback group for the node is used.
:param event_callbacks: User-defined callbacks for middleware events.
:param raw: If ``True``, then received messages will be stored in raw binary
representation.
"""
qos_profile = self._validate_qos_or_depth_parameter(qos_profile)
callback_group = callback_group or self.default_callback_group
try:
final_topic = self.resolve_topic_name(topic)
except RuntimeError:
# if it's name validation error, raise a more appropriate exception.
try:
self._validate_topic_or_service_name(topic)
except InvalidTopicNameException as ex:
raise ex from None
# else reraise the previous exception
raise
if qos_overriding_options is None:
qos_overriding_options = QoSOverridingOptions([])
_declare_qos_parameters(
Subscription, self, final_topic, qos_profile, qos_overriding_options)
# this line imports the typesupport for the message module if not already done
failed = False
check_is_valid_msg_type(msg_type)
try:
with self.handle:
subscription_object = _rclpy.Subscription(
self.handle, msg_type, topic, qos_profile.get_c_qos_profile())
except ValueError:
failed = True
if failed:
self._validate_topic_or_service_name(topic)
try:
subscription = Subscription(
subscription_object, msg_type,
topic, callback, callback_group, qos_profile, raw,
event_callbacks=event_callbacks or SubscriptionEventCallbacks())
except Exception:
subscription_object.destroy_when_not_in_use()
raise
# Comment out registration not to execute subscription callback
# callback_group.add_entity(subscription)
# self._subscriptions.append(subscription)
# self._wake_executor()
# for event_handler in subscription.event_handlers:
# self.add_waitable(event_handler)
return subscription
class MinimalSubscriber(NodeEx):
count_ = 0
def __init__(self):
super().__init__('minimal_subscriber')
self.subscription = self.create_subscription_ex(
String,
'topic',
self.listener_callback,
10)
timer_period = 1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
if not self.subscription:
return;
msg_array = self.subscription.handle.take_message(self.subscription.msg_type, self.subscription.raw)
if not msg_array:
return
msg = msg_array[0]
self.get_logger().info('I heard: "%s"' % msg.data)
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
def main(args=None):
rclpy.init(args=args)
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)
minimal_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment