Skip to content

Instantly share code, notes, and snippets.

@ingoogni
Last active March 6, 2019 06:16
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ingoogni/416caa7ed0e8b79391f1899ed9de15b5 to your computer and use it in GitHub Desktop.
Save ingoogni/416caa7ed0e8b79391f1899ed9de15b5 to your computer and use it in GitHub Desktop.
Cherrypy MQTT HBMQTT plugin
#https://stackoverflow.com/questions/26270681/can-an-asyncio-event-loop-run-in-the-background-without-suspending-the-python-in
import asyncio
import threading
import cherrypy
from cherrypy.process import plugins
from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1, QOS_2
class MQTTrepub(plugins.SimplePlugin):
"""
Plugin that listens to MQTT topics (HBMQTT) and publishes the payload
'unmodified' to a channel on the CherryPy bus. The cherrypy channel name
is the same as the MQTT topic
Requires HBMQTT
"""
thread = None
def __init__(self, bus, broker, subscriptionlist):
plugins.SimplePlugin.__init__(self, bus)
self.broker = broker
self.subscriptionlist = subscriptionlist
self.unsubscriptionlist = [item[0] for item in subscriptionlist]
self.loop = asyncio.get_event_loop()
self.C = MQTTClient(client_id = "cherrypy")
self.running_flag = True
cherrypy.engine.subscribe("stop", self.flag_setter)
def flag_setter(self):
self.running_flag = False
@asyncio.coroutine
def cherrymqtt(self, broker, subscriptionlist):
yield from self.C.connect(broker)
yield from self.C.subscribe(subscriptionlist)
try:
while self.running_flag == True:
message = yield from self.C.deliver_message()
packet = message.publish_packet
#print(packet)
cherrypy.engine.publish(
packet.variable_header.topic_name, [
packet.variable_header.topic_name,
packet.protocol_ts, packet.payload.data
]
)
yield from self.C.unsubscribe(self.unsubscriptionlist)
yield from self.C.disconnect()
self.loop.call_soon_threadsafe(self.loop.stop)
self.stop()
except ClientException as ce:
print("Client exception: %s" % ce)
def start(self):
if not self.thread:
self.bus.log('Starting up hbmqttcherrypy')
self.thread = threading.Thread(target=self.run).start()
def stop(self):
self.bus.log('Shut down hbmqttcherrypy')
def run(self):
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.cherrymqtt(self.broker, self.subscriptionlist))
#broker = 'mqtt://localhost/'
#subscriptionlist = [('some/topic/string/temperatures', QOS_1),]
#MQTTrepub(cherrypy.engine, broker, subscriptionlist).subscribe()
Work in progress.
A, probably crude, attempt to push MQTT topics & data onto the cherrypy bus, using a HBMQTT client as cherrypy plug in.
@canDry
Copy link

canDry commented Mar 4, 2019

Hi... I'm having trouble understanding how this works.

I have an existing cherrypy server running and would like to incorporate your code into it so that it can push and pull mqtt messages.

Hoping you could give a bit more direction on how to incorporate it into existing cherrpy code.

===================================

Figured it out. For starters I needed to upgrade py to 3.5!
You also needed to subscribe to the topic, otherwise your publish doesn't seem to go anywhere.
And, finally... when it quits it doesn't terminate the thread.

Thanks for posting this though, it was the only example I could find anywhere!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment