Skip to content

Instantly share code, notes, and snippets.

@lbt
Created January 16, 2024 10:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lbt/25bafb54ff150cf8a7548bde8757623b to your computer and use it in GitHub Desktop.
Save lbt/25bafb54ff150cf8a7548bde8757623b to your computer and use it in GitHub Desktop.
import json
from ovos_workshop.skills import OVOSSkill
from ovos_workshop.decorators import intent_handler
import paho.mqtt.client as mqtt
import paho.mqtt
class FeedTheFish(OVOSSkill):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config = {
"protocol": "mqtt",
"mqtt-ssl": "no",
"mqtt-ca-cert": "/etc/ssl/certs/ca-certificates.crt",
"mqtt-host": "mqtt.example.com",
"mqtt-port": 1883,
"mqtt-auth": "yes",
"mqtt-user": "user",
"mqtt-pass": "pass"
}
mqttssl = self.config["mqtt-ssl"]
mqttca = self.config["mqtt-ca-cert"]
mqtthost = self.config["mqtt-host"]
mqttport = self.config["mqtt-port"]
mqttauth = self.config["mqtt-auth"]
mqttuser = self.config["mqtt-user"]
mqttpass = self.config["mqtt-pass"]
self.mqttc = mqtt.Client("MycroftAI")
if (mqttauth == "yes"):
self.mqttc.username_pw_set(mqttuser, mqttpass)
if (mqttssl == "yes"):
self.mqttc.tls_set(mqttca)
self.mqttc.on_message = self.on_message
self.mqttc.on_connect = self.on_connect
self.mqttc.on_subscribe = self.on_subscribe
self.log.info(f"Connect to: {mqtthost}:{mqttport}")
self.mqttc.connect(mqtthost, mqttport, 10)
self.mqttc.loop_start()
self.log.info("MQTT loop running")
def shutdown(self):
self.mqttc.disconnect()
self.mqttc.loop_stop()
def on_connect(self, client, userdata, flags, rc):
self.log.info(f"Connected, subscribing")
self.mqttc.subscribe("named/control/mycroft", qos=2)
def on_subscribe(self, client, userdata, mid, granted_qos):
self.log.info(f"subscribed")
def on_message(self, client, userdata, msg):
self.log.info(f"Got msg : {msg.topic} = {msg.payload}")
if msg.topic == "named/control/mycroft":
self.speak(msg.payload.decode("utf-8"))
def send_mqtt(self, topic, payload, retain=True):
info = self.mqttc.publish(topic, payload, retain=retain, qos=2)
info.wait_for_publish()
if info.rc == paho.mqtt.client.MQTT_ERR_SUCCESS:
self.log.info(f"Published: {topic} = {payload}")
return True
else:
self.log.info(f"Failed to publish: {topic} = {payload}\n{info}")
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment