Skip to content

Instantly share code, notes, and snippets.

@zoide
Last active May 19, 2020 17:46
Show Gist options
  • Save zoide/ba77a79e896bbed024a9a7a244ad4c0e to your computer and use it in GitHub Desktop.
Save zoide/ba77a79e896bbed024a9a7a244ad4c0e to your computer and use it in GitHub Desktop.
MQTT Power Plugin for octoprint
import octoprint.plugin
import subprocess
class MqttPrinterPowerPlugin(octoprint.plugin.StartupPlugin,
octoprint.plugin.SettingsPlugin):
def __init__(self):
self.mqtt_publish = lambda *args, **kwargs: None
self.mqtt_subscribe = lambda *args, **kwargs: None
self.mqtt_unsubscribe = lambda *args, **kwargs: None
def on_after_startup(self):
helpers = self._plugin_manager.get_helpers("mqtt", "mqtt_publish", "mqtt_subscribe", "mqtt_unsubscribe")
if helpers:
if "mqtt_publish" in helpers:
self.mqtt_publish = helpers["mqtt_publish"]
if "mqtt_subscribe" in helpers:
self.mqtt_subscribe = helpers["mqtt_subscribe"]
if "mqtt_unsubscribe" in helpers:
self.mqtt_unsubscribe = helpers["mqtt_unsubscribe"]
self._baseTopic = self._settings.global_get(["plugins","mqtt","publish","baseTopic"])
self.mqtt_publish(self._baseTopic + "plugin/PrinterPower", "plugin startup")
self.mqtt_subscribe(self._baseTopic + "power", self._on_mqtt_subscription)
def _on_mqtt_subscription(self, topic, message, retained=None, qos=None, *args, **kwargs):
if message == "ON":
subprocess.call(["sudo","service","klipper","start"])
subprocess.call(["sudo","service","webcamd","start"])
self._printer.connect()
if message == "OFF":
self._printer.disconnect()
subprocess.call(["sudo","service","klipper","stop"])
subprocess.call(["sudo","service","webcamd","stop"])
__plugin_name__ = "drucki.chaos MQTTPrinterPower handling"
__plugin_pythoncompat__ = ">=2.7,<4"
__plugin_implementations__ = [MqttPrinterPowerPlugin()]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment