Created
March 27, 2021 06:49
-
-
Save axlan/7ee9bb361e0e588531a5050e268125f8 to your computer and use it in GitHub Desktop.
A Python implementation of the local Roomba API for use as a check_mk local check
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# This script is based on http://www.richwhitehouse.com/index.php?postid=72 | |
# which in turn was based on https://github.com/koalazak/dorita980 which is | |
# an actually decent node implementation | |
# While it can be used as a library function to query Roomba status, | |
# this script can also be used as a local check in check_mk | |
# https://docs.checkmk.com/latest/en/localchecks.html to make a server | |
# have a service monitoring a roomba. Outputs a string like: | |
# "P Roomba Battery=100;10:;5:|Wifi_RSSI=-66;-80:;-100: Roomba Status" | |
import socket, ssl | |
import time | |
import json | |
import paho.mqtt.client as mqtt | |
# Don't use hard coded credentials, try to find a Roomba broadcasting its password | |
# See getRoombaPassword comment | |
SETTING_FIND_ROOMBA = False | |
SETTING_HOST = 'ROOMBA_IP_ADDRESS' | |
# For the username and password you can try the methods in this file | |
# or follow the guide in https://github.com/koalazak/dorita980#how-to-get-your-usernameblid-and-password | |
SETTING_ROOMBA_USER = 'ROOMBA_USER_NAME' | |
SETTING_ROOMBA_PASS = 'ROOMBA_PASSWORD' | |
# This sets the metrics that will be queried along with the reported error thresholds | |
SETTING_METRICS = [ | |
#displayname, status key, warn_lower:warn_upper, crit_lower:crit_upper | |
('Battery', 'batPct', '10:', '5:'), | |
('Wifi_RSSI', 'rssi', '-80:', '-100:'), | |
] | |
SETTING_TIMEOUT = 10 | |
ROOMBA_BROADCAST_PORT = 5678 | |
ROOMBA_BROADCAST_MSG = "irobotmcs" | |
ROOMBA_BROADCAST_MAX_SIZE = 1024 | |
ROOMBA_BROADCAST_TIMEOUT = 5.0 | |
ROOMBA_PASS_PORT = 8883 | |
ROOMBA_PASS_KEY = bytes((0xf0, 0x05, 0xef, 0xcc, 0x3b, 0x29, 0x00)) | |
ROOMBA_PASS_MAX_SIZE = 1024 | |
ROOMBA_TRACK_PORT = 8883 | |
ROOMBA_TRACK_KEEPALIVE = 5 | |
MQTT_TIMEOUT = 0.1 | |
MQTT_MAXPACKETS = 16 | |
CERT_PATH = "defaultcert.pem" | |
# Function to report the presence of roombas on the network | |
# the status looks like: | |
# {'ver': '3', 'hostname': 'Roomba-ROOMBA_USER_NAME', 'robotname': 'Turkey', 'ip': '192.168.1.135', 'mac': '70:66:55:0A:DD:83', 'sw': '3.5.62', 'sku': 'R675020', 'nc': 0, 'proto': 'mqtt', 'cap': {'ota': 1, 'eco': 1, 'svcConf': 1}} | |
# Where ROOMBA_USER_NAME is the username needed to authenticate the MQTT | |
def broadcastFindRoomba(): | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.settimeout(ROOMBA_BROADCAST_TIMEOUT) | |
s.bind(("", ROOMBA_BROADCAST_PORT)) | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
s.sendto(ROOMBA_BROADCAST_MSG.encode("UTF-8"), ("255.255.255.255", ROOMBA_BROADCAST_PORT)) | |
foundAddr = {} | |
startTime = time.time() | |
while (time.time() - startTime) < ROOMBA_BROADCAST_TIMEOUT: | |
try: | |
data, addr = s.recvfrom(ROOMBA_BROADCAST_MAX_SIZE) | |
except socket.timeout: | |
continue | |
if data and len(data) > 10: | |
#being lazy about the validation here. | |
try: | |
tryParse = json.loads(data.decode("UTF-8")) | |
#print("Received reply from Roomba:", tryParse, addr) | |
foundAddr[tryParse["robotname"]] = tryParse | |
except: | |
pass | |
time.sleep(0.1) | |
s.close() | |
return foundAddr | |
# Tries to get the password for a Roomba at IP addr | |
# You must do the following before running this function: | |
# Make sure your robot is on the Home Base and powered on. | |
# Then press and hold the HOME button on your robot until it plays a series of tones (about 2 seconds). | |
# Release the button and your robot will flash WIFI light. | |
# returns a string with the password if successful | |
def getRoombaPassword(addr): | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
ssl_sock = ssl.wrap_socket(s, ca_certs=None, cert_reqs=ssl.CERT_NONE, ssl_version=ssl.PROTOCOL_TLSv1) | |
ssl_sock.connect((addr, ROOMBA_PASS_PORT)) | |
ssl_sock.send(ROOMBA_PASS_KEY) | |
sliceFrom = 11 | |
data = bytearray() | |
while True: | |
data += ssl_sock.recv(ROOMBA_PASS_MAX_SIZE) | |
if len(data) == 2: | |
sliceFrom = 7 | |
elif len(data) >= 8: | |
return data[sliceFrom:].decode("UTF-8") | |
else: | |
#print("Unexpected response length:", len(data)) | |
#print("Make sure you held down the HOME button") | |
return None | |
ssl_sock.close() | |
return None | |
# Helper function for pulling values out of the reported state | |
def get_recursively(search_dict, field): | |
""" | |
Takes a dict with nested lists and dicts, | |
and searches all dicts for a key of the field | |
provided. | |
""" | |
fields_found = [] | |
for key, value in search_dict.items(): | |
if key == field: | |
fields_found.append(value) | |
elif isinstance(value, dict): | |
results = get_recursively(value, field) | |
for result in results: | |
fields_found.append(result) | |
elif isinstance(value, list): | |
for item in value: | |
if isinstance(item, dict): | |
more_results = get_recursively(item, field) | |
for another_result in more_results: | |
fields_found.append(another_result) | |
return fields_found | |
# MQTT Client for getting state from a Roomba | |
# In practice the Roomba usually only sends most of its status when first | |
# connected to, then mostly sends WiFi status | |
class RoombaClient: | |
# Tries to connect with the supplied credentials | |
def __init__(self, addr, username, password): | |
self.state = {} | |
try: | |
mqttc = mqtt.Client(username) | |
mqttc._userdata = self | |
#mqttc.tls_set(ca_certs=CERT_PATH, certfile=None, keyfile=None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1) | |
# Errors on Ubuntu 20.04 | |
# See https://www.gitmemory.com/issue/keenlabs/KeenClient-Python/161/756683788 | |
# https://stackoverflow.com/questions/38015537/python-requests-exceptions-sslerror-dh-key-too-small | |
#mqttc.tls_set(ca_certs=CERT_PATH, certfile=None, keyfile=None, cert_reqs=ssl.CERT_NONE, ciphers='HIGH:!DH:!aNULL') | |
mqttc.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_NONE) | |
mqttc.username_pw_set(username, password) | |
mqttc.on_message = self.__OnRoombaMessage | |
mqttc.on_publish = self.__OnRoombaPublish | |
mqttc.on_connect = self.__OnRoombaConnect | |
mqttc.on_disconnected = self.__OnRoombaDisconnect | |
mqttc.connect(addr, ROOMBA_TRACK_PORT, ROOMBA_TRACK_KEEPALIVE) | |
self.mqttc = mqttc | |
except Exception as _: | |
#print(e) | |
#print("Exception during MQTT connection. The device may not be accepting connections, or the socket may already be in use.") | |
self.mqttc = None | |
# Reports True if the connection succeeded | |
def IsConnected(self): | |
return self.mqttc is not None | |
def __OnRoombaConnect(self, mqttc, userdata, flags, rc): | |
#print('RoombaConnect') | |
pass | |
def __OnRoombaDisconnect(self, mqttc, userdata, rc): | |
#print('RoombaDisconnect') | |
pass | |
def __OnRoombaMessage(self, mqttc, userdata, msg): | |
#print(msg.topic) | |
#print(msg.payload.decode("UTF-8")) | |
data = json.loads(msg.payload.decode("UTF-8")) | |
self.state.update(data['state']['reported']) | |
def __OnRoombaPublish(self, mqttc, userdata, mid): | |
#print('RoombaPublish') | |
pass | |
# Close connection | |
def CloseExistingMqtt(self): | |
if self.mqttc: | |
self.mqttc.disconnect() | |
self.mqttc = None | |
return True | |
return False | |
"""Return a dict of state info collected from MQTT messages | |
returns all the state fields collected since connecting | |
or the last time fresh==True was set. | |
Keyword arguments: | |
fields -- optional list of dict fields. The function will return | |
immediately if the collected state has all the specified fields | |
duration -- time to take before returning, or max duration to wait if fields are specified | |
fresh -- clear the state previously received | |
""" | |
def GetState(self, fields=None, duration=5.0, fresh=False): | |
if self.mqttc: | |
if fresh: | |
self.state = {} | |
start_time = time.time() | |
while start_time + duration > time.time(): | |
r = self.mqttc.loop(MQTT_TIMEOUT, MQTT_MAXPACKETS) | |
if r != 0: | |
#print("Lost connection to the Roomba.") | |
self.mqttc = None | |
break | |
if fields: | |
found = [ len(get_recursively(self.state, field)) > 0 for field in fields ] | |
if all(found): | |
break | |
return self.state | |
# Try to output the the Roomba status based on the SETTING variables | |
def main(): | |
try: | |
if SETTING_FIND_ROOMBA: | |
try: | |
roombas = broadcastFindRoomba() | |
roomba = next(iter(roombas.values())) | |
user = roomba['hostname'].split('-')[1] | |
host = roomba['ip'] | |
password = getRoombaPassword(host) | |
print(f'SETTING_HOST={host} SETTING_ROOMBA_USER={user} SETTING_ROOMBA_PASS={password}') | |
except: | |
print(f'3 Roomba - Connection Failure') | |
else: | |
host = SETTING_HOST | |
user = SETTING_ROOMBA_USER | |
password = SETTING_ROOMBA_PASS | |
client = RoombaClient(host, user, password) | |
if not client.IsConnected(): | |
print(f'3 Roomba - Connection Failure') | |
return | |
fields = [metric[1] for metric in SETTING_METRICS] | |
state = client.GetState(fields, SETTING_TIMEOUT) | |
#print(state) | |
metrics_strs = [] | |
for metric in SETTING_METRICS: | |
tmp = get_recursively(state, metric[1]) | |
val = '' | |
if len(tmp) > 0: | |
val = tmp[0] | |
metrics_strs.append(f'{metric[0]}={val};{metric[2]};{metric[3]}') | |
print(f'P Roomba {"|".join(metrics_strs)} Roomba Status') | |
client.CloseExistingMqtt() | |
except Exception as e: | |
print(f'3 Roomba - Script Failure {e}') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment