Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A Python implementation of the local Roomba API for use as a check_mk local check
#!/usr/bin/env python3
# This script is based on http://www.richwhitehouse.com/index.php?postid=72
# which in turn was based on https://github.com/koalazak/dorita980 which is
# an actually decent node implementation
# While it can be used as a library function to query Roomba status,
# this script can also be used as a local check in check_mk
# https://docs.checkmk.com/latest/en/localchecks.html to make a server
# have a service monitoring a roomba. Outputs a string like:
# "P Roomba Battery=100;10:;5:|Wifi_RSSI=-66;-80:;-100: Roomba Status"
import socket, ssl
import time
import json
import paho.mqtt.client as mqtt
# Don't use hard coded credentials, try to find a Roomba broadcasting its password
# See getRoombaPassword comment
SETTING_FIND_ROOMBA = False
SETTING_HOST = 'ROOMBA_IP_ADDRESS'
# For the username and password you can try the methods in this file
# or follow the guide in https://github.com/koalazak/dorita980#how-to-get-your-usernameblid-and-password
SETTING_ROOMBA_USER = 'ROOMBA_USER_NAME'
SETTING_ROOMBA_PASS = 'ROOMBA_PASSWORD'
# This sets the metrics that will be queried along with the reported error thresholds
SETTING_METRICS = [
#displayname, status key, warn_lower:warn_upper, crit_lower:crit_upper
('Battery', 'batPct', '10:', '5:'),
('Wifi_RSSI', 'rssi', '-80:', '-100:'),
]
SETTING_TIMEOUT = 10
ROOMBA_BROADCAST_PORT = 5678
ROOMBA_BROADCAST_MSG = "irobotmcs"
ROOMBA_BROADCAST_MAX_SIZE = 1024
ROOMBA_BROADCAST_TIMEOUT = 5.0
ROOMBA_PASS_PORT = 8883
ROOMBA_PASS_KEY = bytes((0xf0, 0x05, 0xef, 0xcc, 0x3b, 0x29, 0x00))
ROOMBA_PASS_MAX_SIZE = 1024
ROOMBA_TRACK_PORT = 8883
ROOMBA_TRACK_KEEPALIVE = 5
MQTT_TIMEOUT = 0.1
MQTT_MAXPACKETS = 16
CERT_PATH = "defaultcert.pem"
# Function to report the presence of roombas on the network
# the status looks like:
# {'ver': '3', 'hostname': 'Roomba-ROOMBA_USER_NAME', 'robotname': 'Turkey', 'ip': '192.168.1.135', 'mac': '70:66:55:0A:DD:83', 'sw': '3.5.62', 'sku': 'R675020', 'nc': 0, 'proto': 'mqtt', 'cap': {'ota': 1, 'eco': 1, 'svcConf': 1}}
# Where ROOMBA_USER_NAME is the username needed to authenticate the MQTT
def broadcastFindRoomba():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(ROOMBA_BROADCAST_TIMEOUT)
s.bind(("", ROOMBA_BROADCAST_PORT))
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.sendto(ROOMBA_BROADCAST_MSG.encode("UTF-8"), ("255.255.255.255", ROOMBA_BROADCAST_PORT))
foundAddr = {}
startTime = time.time()
while (time.time() - startTime) < ROOMBA_BROADCAST_TIMEOUT:
try:
data, addr = s.recvfrom(ROOMBA_BROADCAST_MAX_SIZE)
except socket.timeout:
continue
if data and len(data) > 10:
#being lazy about the validation here.
try:
tryParse = json.loads(data.decode("UTF-8"))
#print("Received reply from Roomba:", tryParse, addr)
foundAddr[tryParse["robotname"]] = tryParse
except:
pass
time.sleep(0.1)
s.close()
return foundAddr
# Tries to get the password for a Roomba at IP addr
# You must do the following before running this function:
# Make sure your robot is on the Home Base and powered on.
# Then press and hold the HOME button on your robot until it plays a series of tones (about 2 seconds).
# Release the button and your robot will flash WIFI light.
# returns a string with the password if successful
def getRoombaPassword(addr):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssl_sock = ssl.wrap_socket(s, ca_certs=None, cert_reqs=ssl.CERT_NONE, ssl_version=ssl.PROTOCOL_TLSv1)
ssl_sock.connect((addr, ROOMBA_PASS_PORT))
ssl_sock.send(ROOMBA_PASS_KEY)
sliceFrom = 11
data = bytearray()
while True:
data += ssl_sock.recv(ROOMBA_PASS_MAX_SIZE)
if len(data) == 2:
sliceFrom = 7
elif len(data) >= 8:
return data[sliceFrom:].decode("UTF-8")
else:
#print("Unexpected response length:", len(data))
#print("Make sure you held down the HOME button")
return None
ssl_sock.close()
return None
# Helper function for pulling values out of the reported state
def get_recursively(search_dict, field):
"""
Takes a dict with nested lists and dicts,
and searches all dicts for a key of the field
provided.
"""
fields_found = []
for key, value in search_dict.items():
if key == field:
fields_found.append(value)
elif isinstance(value, dict):
results = get_recursively(value, field)
for result in results:
fields_found.append(result)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
more_results = get_recursively(item, field)
for another_result in more_results:
fields_found.append(another_result)
return fields_found
# MQTT Client for getting state from a Roomba
# In practice the Roomba usually only sends most of its status when first
# connected to, then mostly sends WiFi status
class RoombaClient:
# Tries to connect with the supplied credentials
def __init__(self, addr, username, password):
self.state = {}
try:
mqttc = mqtt.Client(username)
mqttc._userdata = self
#mqttc.tls_set(ca_certs=CERT_PATH, certfile=None, keyfile=None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1)
# Errors on Ubuntu 20.04
# See https://www.gitmemory.com/issue/keenlabs/KeenClient-Python/161/756683788
# https://stackoverflow.com/questions/38015537/python-requests-exceptions-sslerror-dh-key-too-small
#mqttc.tls_set(ca_certs=CERT_PATH, certfile=None, keyfile=None, cert_reqs=ssl.CERT_NONE, ciphers='HIGH:!DH:!aNULL')
mqttc.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_NONE)
mqttc.username_pw_set(username, password)
mqttc.on_message = self.__OnRoombaMessage
mqttc.on_publish = self.__OnRoombaPublish
mqttc.on_connect = self.__OnRoombaConnect
mqttc.on_disconnected = self.__OnRoombaDisconnect
mqttc.connect(addr, ROOMBA_TRACK_PORT, ROOMBA_TRACK_KEEPALIVE)
self.mqttc = mqttc
except Exception as _:
#print(e)
#print("Exception during MQTT connection. The device may not be accepting connections, or the socket may already be in use.")
self.mqttc = None
# Reports True if the connection succeeded
def IsConnected(self):
return self.mqttc is not None
def __OnRoombaConnect(self, mqttc, userdata, flags, rc):
#print('RoombaConnect')
pass
def __OnRoombaDisconnect(self, mqttc, userdata, rc):
#print('RoombaDisconnect')
pass
def __OnRoombaMessage(self, mqttc, userdata, msg):
#print(msg.topic)
#print(msg.payload.decode("UTF-8"))
data = json.loads(msg.payload.decode("UTF-8"))
self.state.update(data['state']['reported'])
def __OnRoombaPublish(self, mqttc, userdata, mid):
#print('RoombaPublish')
pass
# Close connection
def CloseExistingMqtt(self):
if self.mqttc:
self.mqttc.disconnect()
self.mqttc = None
return True
return False
"""Return a dict of state info collected from MQTT messages
returns all the state fields collected since connecting
or the last time fresh==True was set.
Keyword arguments:
fields -- optional list of dict fields. The function will return
immediately if the collected state has all the specified fields
duration -- time to take before returning, or max duration to wait if fields are specified
fresh -- clear the state previously received
"""
def GetState(self, fields=None, duration=5.0, fresh=False):
if self.mqttc:
if fresh:
self.state = {}
start_time = time.time()
while start_time + duration > time.time():
r = self.mqttc.loop(MQTT_TIMEOUT, MQTT_MAXPACKETS)
if r != 0:
#print("Lost connection to the Roomba.")
self.mqttc = None
break
if fields:
found = [ len(get_recursively(self.state, field)) > 0 for field in fields ]
if all(found):
break
return self.state
# Try to output the the Roomba status based on the SETTING variables
def main():
try:
if SETTING_FIND_ROOMBA:
try:
roombas = broadcastFindRoomba()
roomba = next(iter(roombas.values()))
user = roomba['hostname'].split('-')[1]
host = roomba['ip']
password = getRoombaPassword(host)
print(f'SETTING_HOST={host} SETTING_ROOMBA_USER={user} SETTING_ROOMBA_PASS={password}')
except:
print(f'3 Roomba - Connection Failure')
else:
host = SETTING_HOST
user = SETTING_ROOMBA_USER
password = SETTING_ROOMBA_PASS
client = RoombaClient(host, user, password)
if not client.IsConnected():
print(f'3 Roomba - Connection Failure')
return
fields = [metric[1] for metric in SETTING_METRICS]
state = client.GetState(fields, SETTING_TIMEOUT)
#print(state)
metrics_strs = []
for metric in SETTING_METRICS:
tmp = get_recursively(state, metric[1])
val = ''
if len(tmp) > 0:
val = tmp[0]
metrics_strs.append(f'{metric[0]}={val};{metric[2]};{metric[3]}')
print(f'P Roomba {"|".join(metrics_strs)} Roomba Status')
client.CloseExistingMqtt()
except Exception as e:
print(f'3 Roomba - Script Failure {e}')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment