Skip to content

Instantly share code, notes, and snippets.

@JEnoch
Created February 16, 2023 10:12
Show Gist options
  • Save JEnoch/e11404b1ed27e96679e7f4a09c7fa832 to your computer and use it in GitHub Desktop.
Save JEnoch/e11404b1ed27e96679e7f4a09c7fa832 to your computer and use it in GitHub Desktop.
A Zenoh python script demonstrating ROS2 + Zigbee/MQTT integration
# This script is an example of ROS2 + Zigbee/MQTT integration thanks to Zenoh.
#
# By default, it assumes that:
# - a zenoh-bridge-dds is deployed and routes the ROS2 traffic to zenoh. It's configured to use "simu" as a scope.
# - a zenoh-bridge-mqtt is deployed and routes the MQTT traffic coming from zigbee2mqtt software
#
# This script can connect to any Zenoh endpoint using the "-e" option (a Zenoh router, the zenoh-bridge-dds or the zenoh-bridge-mqtt).
# Then it performs the following:
#
# - It subscribes in Zenoh to "zigbee2mqtt/device/button" to receive JSON messages from zigbee2mqtt when a Zigbee push button named "device/button" is pressed.
# In reaction, it publishes Twist messages in Zenoh to "simu/rt/cmd_vel" to move the ROS2 robot
#
# - It subscribes in Zenoh to "simu/rt/scan" to receive LaserScan measurements from ROS2.
# The measurements for the front angle (60°) are extracted, the minimal range value is determined and a proximity level is computed.
# If the proximity level changed from the last measurements, JSON message is published in Zenoh to "zigbee2mqtt/device/bulb/set"
# to change the color and the brightness of a Zigbee lightbulb.
#
# - It reacts to keyboard events on the arrow keys and spacebar, publishing Twist messages in Zenoh to "simu/rt/cmd_vel" to move the ROS2 robot
#
import sys
from datetime import datetime
import time
import argparse
import curses
import zenoh
import json
from dataclasses import dataclass
from typing import List
from pycdr2 import IdlStruct
from pycdr2.types import int8, int32, uint32, float64, float32, sequence, array
#
# ROS2 topic types definition, with serialize() and deserialize() operation added by pycdr2
#
@dataclass
class Vector3(IdlStruct, typename="Vector3"):
x: float64
y: float64
z: float64
@dataclass
class Twist(IdlStruct, typename="Twist"):
linear: Vector3
angular: Vector3
@dataclass
class Time(IdlStruct, typename="Time"):
sec: uint32
nsec: uint32
@dataclass
class Header(IdlStruct, typename="Header"):
stamp: Time
frame_id: str
@dataclass
class LaserScan(IdlStruct, typename="LaserScan"):
stamp_sec: uint32
stamp_nsec: uint32
frame_id: str
angle_min: float32
angle_max: float32
angle_increment: float32
time_increment: float32
scan_time: float32
range_min: float32
range_max: float32
ranges: List[float32]
intensities: List[float32]
# The front angle to consider getting minimal front distance from LaserScan
PROXIMITY_SCAN_ANGLE = 60
# A number of proximity level, corresponding to a number of colors to display on the lightbulb
NB_PROXIMITY_LEVEL = 4
# The last proximity level, to not re-publish color change command if level didn't change
last_prox_level = 0
def main(stdscr):
# Use stdscr to get pressed keys (arrow keys to move the robot)
stdscr.refresh()
# --- Command line argument parsing --- --- --- --- --- ---
parser = argparse.ArgumentParser(
prog='zigbee-ros2-teleop',
description='Zigbee -> MQTT -> Zenoh -> ROS2 teleop example')
parser.add_argument('--mode', '-m', dest='mode',
choices=['peer', 'client'],
type=str,
help='The zenoh session mode.')
parser.add_argument('--connect', '-e', dest='connect',
metavar='ENDPOINT',
action='append',
type=str,
help='zenoh endpoints to connect to.')
parser.add_argument('--listen', '-l', dest='listen',
metavar='ENDPOINT',
action='append',
type=str,
help='zenoh endpoints to listen on.')
parser.add_argument('--config', '-c', dest='config',
metavar='FILE',
type=str,
help='A configuration file.')
parser.add_argument('--cmd_vel', dest='cmd_vel',
default='simu/rt/cmd_vel',
type=str,
help='The "cmd_vel" ROS2 topic.')
parser.add_argument('--scan', dest='scan',
default='simu/rt/scan',
type=str,
help='The "scan" ROS2 topic.')
parser.add_argument('--button', dest='button',
default='zigbee2mqtt/device/button',
type=str,
help='The Zenoh key to subscribe MQTT button publications.')
parser.add_argument('--light', dest='light',
default='zigbee2mqtt/device/bulb/set',
type=str,
help='The Zenoh key to publish MQTT messages for light.')
parser.add_argument('--angular_scale', '-a', dest='angular_scale',
default='0.5',
type=float,
help='The angular scale.')
parser.add_argument('--linear_scale', '-x', dest='linear_scale',
default='0.2',
type=float,
help='The linear scale.')
parser.add_argument('--max_range', dest='max_range',
default='0.8',
type=float,
help='The maximal distance above which the light will be green.')
parser.add_argument('--min_range', dest='min_range',
default='0.2',
type=float,
help='The minimal distance below which the light be red.')
# arguments parsing and config preparation
args = parser.parse_args()
conf = zenoh.config_from_file(args.config) if args.config is not None else zenoh.Config()
if args.mode is not None:
conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode))
if args.connect is not None:
conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect))
if args.listen is not None:
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
cmd_vel = args.cmd_vel
scan = args.scan
button = args.button
light = args.light
angular_scale = args.angular_scale
linear_scale = args.linear_scale
max_range = args.max_range
min_range = args.min_range
# zenoh-net code --- --- --- --- --- --- --- --- --- --- ---
# initiate logging
zenoh.init_logger()
print("Openning session...")
session = zenoh.open(conf)
# to publish a Twist message to ROS2
def pub_twist(linear, angular):
print("Pub twist: {} - {}".format(linear, angular))
t = Twist(linear=Vector3(x=linear, y=0.0, z=0.0),
angular=Vector3(x=0.0, y=0.0, z=angular))
session.put(cmd_vel, t.serialize())
# Zenoh (MQTT) button subscription
print("Subscriber on '{}'...".format(button))
def button_callback(sample):
# On JSON payload reception, check 'action' (single or double) and publish Twist
m = json.loads(sample.payload)
print('Received sample {}'.format(sample))
if m['action'] == 'single':
print(' => single')
pub_twist(0.0, 1.0 * angular_scale)
time.sleep(2.0)
pub_twist(0.0, 0.0)
elif m['action'] == 'double':
print(' => double')
pub_twist(0.0, -1.0 * angular_scale)
time.sleep(2.0)
pub_twist(0.0, 0.0)
sub_button = session.declare_subscriber(button, button_callback)
# Zenoh (ROS2) scan subscription
print("Subscriber on '{}'...".format(scan))
def scan_callback(sample):
global last_prox_level
scan = LaserScan.deserialize(sample.payload)
# get the minimal range value in the front angle (PROXIMITY_SCAN_ANGLE)
range = min(scan.ranges[0:round(PROXIMITY_SCAN_ANGLE/2)] + scan.ranges[round(360-PROXIMITY_SCAN_ANGLE/2):359])
prox_level = round((max_range - range) * NB_PROXIMITY_LEVEL / (max_range - min_range))
prox_level=max(min(prox_level,NB_PROXIMITY_LEVEL),0)
# if proximity level changed:
if (prox_level != last_prox_level):
# compute brightness and color depending the proximity level
brightness = 15 + (prox_level * 240) / NB_PROXIMITY_LEVEL
red = round(255 * prox_level / NB_PROXIMITY_LEVEL)
green = round(255 * (1 - (prox_level / NB_PROXIMITY_LEVEL)))
blue = 0
print(f"range={range} => proximity_level={prox_level} => publish brightness={brightness},RGB=({red},{green},{blue}) to {light} ")
# publish over zenoh the JSON message setting the lightbulb brightness and color
# (see supported JSON attributes in https://www.zigbee2mqtt.io/devices/33943_33944_33946.html)
session.put(light, {'brightness': brightness, 'color': {'r': red, 'g': green, 'b': blue}})
last_prox_level=prox_level
sub_scan = session.declare_subscriber(scan, scan_callback)
# Loop catching keyboard strokes (arrows+space)
print("Waiting commands with arrow keys or space bar to stop. Press ESC or 'q' to quit.")
while True:
c = stdscr.getch()
if c == curses.KEY_UP:
pub_twist(1.0 * linear_scale, 0.0)
elif c == curses.KEY_DOWN:
pub_twist(-1.0 * linear_scale, 0.0)
elif c == curses.KEY_LEFT:
pub_twist(0.0, 1.0 * angular_scale)
elif c == curses.KEY_RIGHT:
pub_twist(0.0, -1.0 * angular_scale)
elif c == 32:
pub_twist(0.0, 0.0)
elif c == 27 or c == ord('q'):
break
sub_button.undeclare()
sub_scan.undeclare()
session.close()
curses.wrapper(main)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment