Skip to content

Instantly share code, notes, and snippets.

@ak64th
Last active January 9, 2020 10:00
Show Gist options
  • Save ak64th/3b1d6acb0182323b29cb9619224eb7e4 to your computer and use it in GitHub Desktop.
Save ak64th/3b1d6acb0182323b29cb9619224eb7e4 to your computer and use it in GitHub Desktop.
Use hbmqtt with prompt_toolkit
import asyncio
import logging
from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_0
from prompt_toolkit.application import Application
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout import Layout
from prompt_toolkit.widgets import Frame, TextArea
from natsort import natsorted
class Monitor:
def __init__(self):
self.data = {}
self.client = MQTTClient()
self.text_area = TextArea(multiline=True, read_only=True)
layout = Layout(
Frame(self.text_area)
)
kb = KeyBindings()
@kb.add('c-c')
def exit_(event):
self.application.exit()
self.application = Application(layout=layout, key_bindings=kb)
async def run(self):
await self.connect()
loop_future = asyncio.ensure_future(self.message_loop())
await self.application.run_async()
loop_future.cancel()
async def connect(self):
await self.client.connect('mqtt://localhost/')
await self.client.subscribe([
('$SYS/broker/load/#', QOS_0),
])
async def disconnect(self):
await self.client.unsubscribe(['$SYS/broker/load/#'])
await self.client.disconnect()
async def message_loop(self):
while True:
try:
message = await self.client.deliver_message()
self.data[message.topic] = message.data.decode()
self.update_text()
except ClientException as ce:
logging.error('Client exception: %s' % ce)
except asyncio.CancelledError:
logging.debug('Loop exited')
await self.disconnect()
def update_text(self):
self.text_area.text = '\n'.join([
f'{topic} => {data}'
for topic, data
in natsorted(self.data.items())
])
if __name__ == '__main__':
logging.basicConfig()
monitor = Monitor()
asyncio.get_event_loop().run_until_complete(monitor.run())
prompt-toolkit~=3.0.2
hbmqtt~=0.9.5
natsort~=7.0.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment