Skip to content

Instantly share code, notes, and snippets.

@tejashah88
Created November 17, 2023 01:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tejashah88/2726b4e029dfd39a4ff811420258fde1 to your computer and use it in GitHub Desktop.
Save tejashah88/2726b4e029dfd39a4ff811420258fde1 to your computer and use it in GitHub Desktop.

Create a new package

cd ~/ros2_ws/src
ros2 pkg create monitor_station --build-type ament_python --dependencies rclpy
code monitor_station/

Install extra dependencies

sudo apt install python3-pynput

Starter Publisher Node Code

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class ExamplePublisher(Node):
    def __init__(self):
        super().__init__('example_publisher')
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello World: %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)


def main(args=None):
    rclpy.init(args=args)
    node = ExamplePublisher()
    rclpy.spin(node)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Starter Subscriber Node Code

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class ExampleSubscriber(Node):
    def __init__(self):
        super().__init__('example_subscriber')
        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10,
        )

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)


def main(args=None):
    rclpy.init(args=args)
    node = MinimalSubscriber()
    rclpy.spin(node)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Launch file

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package="monitor_station",
            executable="temperature_sensor",
        ),
        Node(
            package="monitor_station",
            executable="mouse_sensor",
        ),
        Node(
            package="monitor_station",
            executable="dashboard",
        ),
    ])
@tejashah88
Copy link
Author

Temperature Sensor Node

import rclpy
from rclpy.node import Node

from std_msgs.msg import Float32

import random

class TempSensor(Node):
    def __init__(self):
        super().__init__('temp_sensor')
        self.publisher_ = self.create_publisher(Float32, 'temperature', 10)

        timer_period = 1  # seconds
        self.timer = self.create_timer(timer_period, self.publish_fake_data)
        

    def publish_fake_data(self):
        msg = Float32()

        msg.data = random.uniform(70, 90)
        self.publisher_.publish(msg)


def main(args=None):
    rclpy.init(args=args)
    node = TempSensor()
    rclpy.spin(node)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

@tejashah88
Copy link
Author

Mouse Sensor

import rclpy
from rclpy.node import Node

from std_msgs.msg import Int32MultiArray

from pynput.mouse import Controller

class MouseSensor(Node):
    def __init__(self):
        super().__init__('mouse_sensor')
        self.publisher_ = self.create_publisher(Int32MultiArray, 'mouse', 10)

        timer_period = 1 / 10  # seconds
        self.timer = self.create_timer(timer_period, self.publish_real_data)

        self.mouse = Controller()
        

    def publish_real_data(self):
        msg = Int32MultiArray()
        mouse_pos = self.mouse.position

        msg.data = mouse_pos
        self.publisher_.publish(msg)


def main(args=None):
    rclpy.init(args=args)
    node = MouseSensor()
    rclpy.spin(node)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

@tejashah88
Copy link
Author

Dashboard node

import rclpy
from rclpy.node import Node

from std_msgs.msg import Float32, Int32MultiArray


class Dashboard(Node):
    def __init__(self):
        super().__init__('dashboard')
        
        self.temp_sub = self.create_subscription(
            Float32,
            'temperature',
            self.on_temperature_reading,
            10,
        )

        self.mouse_sub = self.create_subscription(
            Int32MultiArray,
            'mouse',
            self.on_mouse_reading,
            10,
        )


    def on_temperature_reading(self, msg):
        self.get_logger().info(f'temp = {msg.data}')

    
    def on_mouse_reading(self, msg):
        self.get_logger().info(f'mouse = {msg.data}')


def main(args=None):
    rclpy.init(args=args)
    node = Dashboard()
    rclpy.spin(node)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment