Skip to content

Instantly share code, notes, and snippets.

@alyoshenka
Last active September 3, 2023 20:21
Show Gist options
  • Save alyoshenka/d16565e2598877f03dd055e5751696c3 to your computer and use it in GitHub Desktop.
Save alyoshenka/d16565e2598877f03dd055e5751696c3 to your computer and use it in GitHub Desktop.
MQTT Playground for Interfacing with Home Assistant and Neopolitan
const.py
__pycache__/*
logs/*
stocks/*
import json
import paho.mqtt.publish as publish
import paho.mqtt.subscribe as subscribe
from neopolitan.nonblocking import command_map, open_display
from stocks.stockticker import default_tickers, snp_500, nasdaq_100
from const import broker, auth
from routes import runningState, runACommand, scrollSpeed
def publish_running_state(state):
publish.single(runningState, state, hostname=broker, auth=auth)
def run_a_command(cmd, opts):
func = command_map(cmd)
if func:
func(opts) if opts else func()
else:
# todo: bad code; redesign
if cmd == 'stockTicker':
open_display(default_tickers)
elif cmd == 'stockTickerSNP':
open_display(snp_500)
elif cmd == 'stockTickerNASDAQ':
open_display(nasdaq_100)
else:
print('ERROR, bad command:', cmd)
def parse_message_bytes(b):
return b.decode('UTF-8')
def parse_message_string(s):
try:
return json.loads(s)
except Exception as err:
print('Error parsing to JSON:', s, '->', err)
def checkScrollSpeedUpdate(payload):
def publishSpeed(spd):
if spd == 'slow':
spd = 0.7
if spd == 'medium':
spd = 0.2
if spd == 'fast':
spd = 0.1
publish.single(scrollSpeed, spd, hostname=broker, auth=auth)
try:
opts = payload['options']
try:
speed = opts['speed']
publishSpeed(speed)
except:
pass
except:
pass
def message_callback(client, userdata, message):
payload = parse_message_string(message.payload)
if message.topic == runACommand:
cmd = payload['operation']
try:
opts = payload['options']
checkScrollSpeedUpdate(payload)
except:
opts = None
if cmd:
publish_running_state(cmd)
run_a_command(cmd, opts)
else:
print("%s %s" % (message.topic, message.payload))
def main():
publish_running_state('connected')
try:
subscribe.callback(message_callback, runACommand, hostname=broker, auth=auth)
finally:
try:
run_a_command('close')
except:
pass
publish_running_state('disconnected')
if __name__ == '__main__':
main()
home = 'hass/neopolitan' # entry point for all routes
command = home + '/cmd' # run commands
data = home + '/dt' # request data
runACommand = command + '/run' # request that a command be run
runningState = data + '/state' # request the state of the board
scrollSpeed = data + '/scrollSpeed'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment